Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(747)

Side by Side Diff: extensions/browser/api/cast_channel/cast_socket.cc

Issue 505453002: Create dedicated class for handling wire message formatting and parsing. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "extensions/browser/api/cast_channel/cast_socket.h" 5 #include "extensions/browser/api/cast_channel/cast_socket.h"
6 6
7 #include <stdlib.h> 7 #include <stdlib.h>
8 #include <string.h> 8 #include <string.h>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 29 matching lines...) Expand all
40 #define VLOG_WITH_CONNECTION(level) VLOG(level) << "[" << \ 40 #define VLOG_WITH_CONNECTION(level) VLOG(level) << "[" << \
41 ip_endpoint_.ToString() << ", auth=" << channel_auth_ << "] " 41 ip_endpoint_.ToString() << ", auth=" << channel_auth_ << "] "
42 42
43 namespace { 43 namespace {
44 44
45 // The default keepalive delay. On Linux, keepalives probes will be sent after 45 // The default keepalive delay. On Linux, keepalives probes will be sent after
46 // the socket is idle for this length of time, and the socket will be closed 46 // the socket is idle for this length of time, and the socket will be closed
47 // after 9 failed probes. So the total idle time before close is 10 * 47 // after 9 failed probes. So the total idle time before close is 10 *
48 // kTcpKeepAliveDelaySecs. 48 // kTcpKeepAliveDelaySecs.
49 const int kTcpKeepAliveDelaySecs = 10; 49 const int kTcpKeepAliveDelaySecs = 10;
50 50 // Size of a CastSocket header payload.
51 const size_t kHeaderSizeBytes = sizeof(int32);
52 // Maximum byte count for a CastSocket message.
53 const size_t kMaxMessageSizeBytes = 65536;
51 } // namespace 54 } // namespace
52 55
53 namespace extensions { 56 namespace extensions {
54 57
55 static base::LazyInstance<BrowserContextKeyedAPIFactory< 58 static base::LazyInstance<BrowserContextKeyedAPIFactory<
56 ApiResourceManager<core_api::cast_channel::CastSocket> > > g_factory = 59 ApiResourceManager<core_api::cast_channel::CastSocket> > > g_factory =
57 LAZY_INSTANCE_INITIALIZER; 60 LAZY_INSTANCE_INITIALIZER;
58 61
59 // static 62 // static
60 template <> 63 template <>
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
182 ChannelAuthType channel_auth, 185 ChannelAuthType channel_auth,
183 CastSocket::Delegate* delegate, 186 CastSocket::Delegate* delegate,
184 net::NetLog* net_log, 187 net::NetLog* net_log,
185 const base::TimeDelta& timeout, 188 const base::TimeDelta& timeout,
186 const scoped_refptr<Logger>& logger) 189 const scoped_refptr<Logger>& logger)
187 : ApiResource(owner_extension_id), 190 : ApiResource(owner_extension_id),
188 channel_id_(0), 191 channel_id_(0),
189 ip_endpoint_(ip_endpoint), 192 ip_endpoint_(ip_endpoint),
190 channel_auth_(channel_auth), 193 channel_auth_(channel_auth),
191 delegate_(delegate), 194 delegate_(delegate),
192 current_message_size_(0),
193 current_message_(new CastMessage()),
194 net_log_(net_log), 195 net_log_(net_log),
195 logger_(logger), 196 logger_(logger),
196 connect_timeout_(timeout), 197 connect_timeout_(timeout),
197 connect_timeout_timer_(new base::OneShotTimer<CastSocket>), 198 connect_timeout_timer_(new base::OneShotTimer<CastSocket>),
198 is_canceled_(false), 199 is_canceled_(false),
200 current_message_(new CastMessage),
199 connect_state_(CONN_STATE_NONE), 201 connect_state_(CONN_STATE_NONE),
200 write_state_(WRITE_STATE_NONE), 202 write_state_(WRITE_STATE_NONE),
201 read_state_(READ_STATE_NONE), 203 read_state_(READ_STATE_NONE),
202 error_state_(CHANNEL_ERROR_NONE), 204 error_state_(CHANNEL_ERROR_NONE),
203 ready_state_(READY_STATE_NONE) { 205 ready_state_(READY_STATE_NONE) {
204 DCHECK(net_log_); 206 DCHECK(net_log_);
205 DCHECK(channel_auth_ == CHANNEL_AUTH_TYPE_SSL || 207 DCHECK(channel_auth_ == CHANNEL_AUTH_TYPE_SSL ||
206 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED); 208 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED);
207 net_log_source_.type = net::NetLog::SOURCE_SOCKET; 209 net_log_source_.type = net::NetLog::SOURCE_SOCKET;
208 net_log_source_.id = net_log_->NextID(); 210 net_log_source_.id = net_log_->NextID();
209 211
210 // Reuse these buffers for each message. 212 // Buffer is reused across messages.
211 header_read_buffer_ = new net::GrowableIOBuffer(); 213 read_buffer_ = new net::GrowableIOBuffer();
212 header_read_buffer_->SetCapacity(MessageHeader::header_size()); 214 read_buffer_->SetCapacity(kMaxMessageSizeBytes);
213 body_read_buffer_ = new net::GrowableIOBuffer(); 215 framer_.reset(new PacketFramer(read_buffer_));
214 body_read_buffer_->SetCapacity(MessageHeader::max_message_size());
215 current_read_buffer_ = header_read_buffer_;
216 } 216 }
217 217
218 CastSocket::~CastSocket() { 218 CastSocket::~CastSocket() {
219 // Ensure that resources are freed but do not run pending callbacks to avoid 219 // Ensure that resources are freed but do not run pending callbacks to avoid
220 // any re-entrancy. 220 // any re-entrancy.
221 CloseInternal(); 221 CloseInternal();
222 } 222 }
223 223
224 ReadyState CastSocket::ready_state() const { 224 ReadyState CastSocket::ready_state() const {
225 return ready_state_; 225 return ready_state_;
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
265 ip_endpoint_); 265 ip_endpoint_);
266 266
267 return net::ClientSocketFactory::GetDefaultFactory()->CreateSSLClientSocket( 267 return net::ClientSocketFactory::GetDefaultFactory()->CreateSSLClientSocket(
268 connection.Pass(), host_and_port, ssl_config, context); 268 connection.Pass(), host_and_port, ssl_config, context);
269 } 269 }
270 270
271 bool CastSocket::ExtractPeerCert(std::string* cert) { 271 bool CastSocket::ExtractPeerCert(std::string* cert) {
272 DCHECK(cert); 272 DCHECK(cert);
273 DCHECK(peer_cert_.empty()); 273 DCHECK(peer_cert_.empty());
274 net::SSLInfo ssl_info; 274 net::SSLInfo ssl_info;
275 if (!socket_->GetSSLInfo(&ssl_info) || !ssl_info.cert.get()) 275 if (!socket_->GetSSLInfo(&ssl_info) || !ssl_info.cert.get()) {
276 return false; 276 return false;
277 }
277 278
278 logger_->LogSocketEvent(channel_id_, proto::SSL_INFO_OBTAINED); 279 logger_->LogSocketEvent(channel_id_, proto::SSL_INFO_OBTAINED);
279 280
280 bool result = net::X509Certificate::GetDEREncoded( 281 bool result = net::X509Certificate::GetDEREncoded(
281 ssl_info.cert->os_cert_handle(), cert); 282 ssl_info.cert->os_cert_handle(), cert);
282 if (result) 283 if (result) {
283 VLOG_WITH_CONNECTION(1) << "Successfully extracted peer certificate: " 284 VLOG_WITH_CONNECTION(1) << "Successfully extracted peer certificate: "
284 << *cert; 285 << *cert;
286 }
285 287
286 logger_->LogSocketEventWithRv( 288 logger_->LogSocketEventWithRv(
287 channel_id_, proto::DER_ENCODED_CERT_OBTAIN, result ? 1 : 0); 289 channel_id_, proto::DER_ENCODED_CERT_OBTAIN, result ? 1 : 0);
288 return result; 290 return result;
289 } 291 }
290 292
291 bool CastSocket::VerifyChallengeReply() { 293 bool CastSocket::VerifyChallengeReply() {
292 AuthResult result = AuthenticateChallengeReply(*challenge_reply_, peer_cert_); 294 AuthResult result = AuthenticateChallengeReply(*challenge_reply_, peer_cert_);
293 logger_->LogSocketChallengeReplyEvent(channel_id_, result); 295 logger_->LogSocketChallengeReplyEvent(channel_id_, result);
294 return result.success(); 296 return result.success();
(...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after
481 void CastSocket::DoAuthChallengeSendWriteComplete(int result) { 483 void CastSocket::DoAuthChallengeSendWriteComplete(int result) {
482 send_auth_challenge_callback_.Cancel(); 484 send_auth_challenge_callback_.Cancel();
483 VLOG_WITH_CONNECTION(2) << "DoAuthChallengeSendWriteComplete: " << result; 485 VLOG_WITH_CONNECTION(2) << "DoAuthChallengeSendWriteComplete: " << result;
484 DCHECK_GT(result, 0); 486 DCHECK_GT(result, 0);
485 DCHECK_EQ(write_queue_.size(), 1UL); 487 DCHECK_EQ(write_queue_.size(), 1UL);
486 PostTaskToStartConnectLoop(result); 488 PostTaskToStartConnectLoop(result);
487 } 489 }
488 490
489 int CastSocket::DoAuthChallengeSendComplete(int result) { 491 int CastSocket::DoAuthChallengeSendComplete(int result) {
490 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result; 492 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result;
491 if (result < 0) 493 if (result < 0) {
492 return result; 494 return result;
495 }
493 SetConnectState(CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE); 496 SetConnectState(CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE);
494 497
495 // Post a task to start read loop so that DoReadLoop is not nested inside 498 // Post a task to start read loop so that DoReadLoop is not nested inside
496 // DoConnectLoop. This is not strictly necessary but keeps the read loop 499 // DoConnectLoop. This is not strictly necessary but keeps the read loop
497 // code decoupled from connect loop code. 500 // code decoupled from connect loop code.
498 PostTaskToStartReadLoop(); 501 PostTaskToStartReadLoop();
499 // Always return IO_PENDING since the result is always asynchronous. 502 // Always return IO_PENDING since the result is always asynchronous.
500 return net::ERR_IO_PENDING; 503 return net::ERR_IO_PENDING;
501 } 504 }
502 505
503 int CastSocket::DoAuthChallengeReplyComplete(int result) { 506 int CastSocket::DoAuthChallengeReplyComplete(int result) {
504 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeReplyComplete: " << result; 507 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeReplyComplete: " << result;
505 if (result < 0) 508 if (result < 0) {
506 return result; 509 return result;
507 if (!VerifyChallengeReply()) 510 }
511 if (!VerifyChallengeReply()) {
508 return net::ERR_FAILED; 512 return net::ERR_FAILED;
513 }
509 VLOG_WITH_CONNECTION(1) << "Auth challenge verification succeeded"; 514 VLOG_WITH_CONNECTION(1) << "Auth challenge verification succeeded";
510 return net::OK; 515 return net::OK;
511 } 516 }
512 517
513 void CastSocket::DoConnectCallback(int result) { 518 void CastSocket::DoConnectCallback(int result) {
514 SetReadyState((result == net::OK) ? READY_STATE_OPEN : READY_STATE_CLOSED); 519 SetReadyState((result == net::OK) ? READY_STATE_OPEN : READY_STATE_CLOSED);
515 if (result == net::OK) { 520 if (result == net::OK) {
516 SetErrorState(CHANNEL_ERROR_NONE); 521 SetErrorState(CHANNEL_ERROR_NONE);
517 PostTaskToStartReadLoop(); 522 PostTaskToStartReadLoop();
518 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback"; 523 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback";
(...skipping 140 matching lines...) Expand 10 before | Expand all | Expand 10 after
659 write_state_ != WRITE_STATE_NONE); 664 write_state_ != WRITE_STATE_NONE);
660 665
661 // No state change occurred in do-while loop above. This means state has 666 // No state change occurred in do-while loop above. This means state has
662 // transitioned to NONE. 667 // transitioned to NONE.
663 if (write_state_ == WRITE_STATE_NONE) { 668 if (write_state_ == WRITE_STATE_NONE) {
664 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_)); 669 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_));
665 } 670 }
666 671
667 // If write loop is done because the queue is empty then set write 672 // If write loop is done because the queue is empty then set write
668 // state to NONE 673 // state to NONE
669 if (write_queue_.empty()) 674 if (write_queue_.empty()) {
670 SetWriteState(WRITE_STATE_NONE); 675 SetWriteState(WRITE_STATE_NONE);
676 }
671 677
672 // Write loop is done - if the result is ERR_FAILED then close with error. 678 // Write loop is done - if the result is ERR_FAILED then close with error.
673 if (rv == net::ERR_FAILED) 679 if (rv == net::ERR_FAILED) {
674 CloseWithError(); 680 CloseWithError();
681 }
675 } 682 }
676 683
677 int CastSocket::DoWrite() { 684 int CastSocket::DoWrite() {
678 DCHECK(!write_queue_.empty()); 685 DCHECK(!write_queue_.empty());
679 WriteRequest& request = write_queue_.front(); 686 WriteRequest& request = write_queue_.front();
680 687
681 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = " 688 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = "
682 << request.io_buffer->size() << " bytes_written " 689 << request.io_buffer->size() << " bytes_written "
683 << request.io_buffer->BytesConsumed(); 690 << request.io_buffer->BytesConsumed();
684 691
(...skipping 13 matching lines...) Expand all
698 if (result <= 0) { // NOTE that 0 also indicates an error 705 if (result <= 0) { // NOTE that 0 also indicates an error
699 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR); 706 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR);
700 SetWriteState(WRITE_STATE_ERROR); 707 SetWriteState(WRITE_STATE_ERROR);
701 return result == 0 ? net::ERR_FAILED : result; 708 return result == 0 ? net::ERR_FAILED : result;
702 } 709 }
703 710
704 // Some bytes were successfully written 711 // Some bytes were successfully written
705 WriteRequest& request = write_queue_.front(); 712 WriteRequest& request = write_queue_.front();
706 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; 713 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer;
707 io_buffer->DidConsume(result); 714 io_buffer->DidConsume(result);
708 if (io_buffer->BytesRemaining() == 0) // Message fully sent 715 if (io_buffer->BytesRemaining() == 0) { // Message fully sent
709 SetWriteState(WRITE_STATE_DO_CALLBACK); 716 SetWriteState(WRITE_STATE_DO_CALLBACK);
710 else 717 } else {
711 SetWriteState(WRITE_STATE_WRITE); 718 SetWriteState(WRITE_STATE_WRITE);
719 }
712 720
713 return net::OK; 721 return net::OK;
714 } 722 }
715 723
716 int CastSocket::DoWriteCallback() { 724 int CastSocket::DoWriteCallback() {
717 DCHECK(!write_queue_.empty()); 725 DCHECK(!write_queue_.empty());
718 726
719 SetWriteState(WRITE_STATE_WRITE); 727 SetWriteState(WRITE_STATE_WRITE);
720 728
721 WriteRequest& request = write_queue_.front(); 729 WriteRequest& request = write_queue_.front();
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
817 } else { 825 } else {
818 // Connection is already established. Close and send error status via the 826 // Connection is already established. Close and send error status via the
819 // OnError delegate. 827 // OnError delegate.
820 CloseWithError(); 828 CloseWithError();
821 } 829 }
822 } 830 }
823 } 831 }
824 832
825 int CastSocket::DoRead() { 833 int CastSocket::DoRead() {
826 SetReadState(READ_STATE_READ_COMPLETE); 834 SetReadState(READ_STATE_READ_COMPLETE);
827 // Figure out whether to read header or body, and the remaining bytes. 835
828 uint32 num_bytes_to_read = 0; 836 // Determine how many bytes need to be read.
829 if (header_read_buffer_->RemainingCapacity() > 0) { 837 uint32 num_bytes_to_read = framer_->BytesRequested();
mark a. foltz 2014/08/26 20:37:02 Slightly prefer that this be a size_t and then is
Kevin M 2014/08/27 01:14:03 Done.
830 current_read_buffer_ = header_read_buffer_;
831 num_bytes_to_read = header_read_buffer_->RemainingCapacity();
832 CHECK_LE(num_bytes_to_read, MessageHeader::header_size());
833 } else {
834 DCHECK_GT(current_message_size_, 0U);
835 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset();
836 current_read_buffer_ = body_read_buffer_;
837 CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size());
838 }
839 CHECK_GT(num_bytes_to_read, 0U);
840 838
841 // Read up to num_bytes_to_read into |current_read_buffer_|. 839 // Read up to num_bytes_to_read into |current_read_buffer_|.
842 int rv = socket_->Read( 840 int rv = socket_->Read(
843 current_read_buffer_.get(), 841 read_buffer_.get(),
844 num_bytes_to_read, 842 num_bytes_to_read,
845 base::Bind(&CastSocket::DoReadLoop, base::Unretained(this))); 843 base::Bind(&CastSocket::DoReadLoop, base::Unretained(this)));
846 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_READ, rv); 844 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_READ, rv);
847 845
848 return rv; 846 return rv;
849 } 847 }
850 848
851 int CastSocket::DoReadComplete(int result) { 849 int CastSocket::DoReadComplete(int result) {
852 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result 850 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result;
853 << " header offset = " 851
854 << header_read_buffer_->offset()
855 << " body offset = " << body_read_buffer_->offset();
856 if (result <= 0) { // 0 means EOF: the peer closed the socket 852 if (result <= 0) { // 0 means EOF: the peer closed the socket
857 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket"; 853 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket";
858 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR); 854 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR);
859 SetReadState(READ_STATE_ERROR); 855 SetReadState(READ_STATE_ERROR);
860 return result == 0 ? net::ERR_FAILED : result; 856 return result == 0 ? net::ERR_FAILED : result;
861 } 857 }
862 858
863 // Some data was read. Move the offset in the current buffer forward. 859 size_t message_size;
864 CHECK_LE(current_read_buffer_->offset() + result, 860 if (framer_->Ingest(
865 current_read_buffer_->capacity()); 861 result, current_message_.get(), &message_size, &error_state_)) {
866 current_read_buffer_->set_offset(current_read_buffer_->offset() + result); 862 logger_->LogSocketEventForMessage(
867 863 channel_id_,
868 if (current_read_buffer_.get() == header_read_buffer_.get() && 864 proto::MESSAGE_READ,
869 current_read_buffer_->RemainingCapacity() == 0) { 865 current_message_->namespace_(),
870 // A full header is read, process the contents. 866 base::StringPrintf("Message size: %zu", message_size));
871 if (!ProcessHeader()) { 867 SetReadState(READ_STATE_DO_CALLBACK);
872 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE); 868 } else if (error_state_ != CHANNEL_ERROR_NONE) {
873 SetReadState(READ_STATE_ERROR); 869 SetReadState(READ_STATE_ERROR);
874 } else {
875 // Processed header, now read the body.
876 SetReadState(READ_STATE_READ);
877 }
878 } else if (current_read_buffer_.get() == body_read_buffer_.get() &&
879 static_cast<uint32>(current_read_buffer_->offset()) ==
880 current_message_size_) {
881 // Store a copy of current_message_size_ since it will be reset by
882 // ProcessBody().
883 uint32 message_size = current_message_size_;
884 // Full body is read, process the contents.
885 if (ProcessBody()) {
886 logger_->LogSocketEventForMessage(
887 channel_id_,
888 proto::MESSAGE_READ,
889 current_message_->namespace_(),
890 base::StringPrintf("Message size: %u", message_size));
891 SetReadState(READ_STATE_DO_CALLBACK);
892 } else {
893 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE);
894 SetReadState(READ_STATE_ERROR);
895 }
896 } else { 870 } else {
897 // Have not received full header or full body yet; keep reading.
898 SetReadState(READ_STATE_READ); 871 SetReadState(READ_STATE_READ);
899 } 872 }
900
901 return net::OK; 873 return net::OK;
902 } 874 }
903 875
904 int CastSocket::DoReadCallback() { 876 int CastSocket::DoReadCallback() {
905 SetReadState(READ_STATE_READ); 877 SetReadState(READ_STATE_READ);
906 const CastMessage& message = *current_message_; 878 const CastMessage& message = *current_message_;
907 if (ready_state_ == READY_STATE_CONNECTING) { 879 if (ready_state_ == READY_STATE_CONNECTING) {
908 if (IsAuthMessage(message)) { 880 if (IsAuthMessage(message)) {
909 challenge_reply_.reset(new CastMessage(message)); 881 challenge_reply_.reset(new CastMessage(message));
910 logger_->LogSocketEvent(channel_id_, proto::RECEIVED_CHALLENGE_REPLY); 882 logger_->LogSocketEvent(channel_id_, proto::RECEIVED_CHALLENGE_REPLY);
(...skipping 22 matching lines...) Expand all
933 current_message_->Clear(); 905 current_message_->Clear();
934 906
935 return net::OK; 907 return net::OK;
936 } 908 }
937 909
938 int CastSocket::DoReadError(int result) { 910 int CastSocket::DoReadError(int result) {
939 DCHECK_LE(result, 0); 911 DCHECK_LE(result, 0);
940 return net::ERR_FAILED; 912 return net::ERR_FAILED;
941 } 913 }
942 914
943 bool CastSocket::ProcessHeader() {
944 CHECK_EQ(static_cast<uint32>(header_read_buffer_->offset()),
945 MessageHeader::header_size());
946 MessageHeader header;
947 MessageHeader::ReadFromIOBuffer(header_read_buffer_.get(), &header);
948 if (header.message_size > MessageHeader::max_message_size())
949 return false;
950 915
951 VLOG_WITH_CONNECTION(2) << "Parsed header { message_size: "
952 << header.message_size << " }";
953 current_message_size_ = header.message_size;
954 return true;
955 }
956
957 bool CastSocket::ProcessBody() {
958 CHECK_EQ(static_cast<uint32>(body_read_buffer_->offset()),
959 current_message_size_);
960 if (!current_message_->ParseFromArray(
961 body_read_buffer_->StartOfBuffer(), current_message_size_)) {
962 return false;
963 }
964 current_message_size_ = 0;
965 header_read_buffer_->set_offset(0);
966 body_read_buffer_->set_offset(0);
967 current_read_buffer_ = header_read_buffer_;
968 return true;
969 }
970
971 // static
972 bool CastSocket::Serialize(const CastMessage& message_proto,
973 std::string* message_data) {
974 DCHECK(message_data);
975 message_proto.SerializeToString(message_data);
976 size_t message_size = message_data->size();
977 if (message_size > MessageHeader::max_message_size()) {
978 message_data->clear();
979 return false;
980 }
981 CastSocket::MessageHeader header;
982 header.SetMessageSize(message_size);
983 header.PrependToString(message_data);
984 return true;
985 }
986 916
987 void CastSocket::CloseWithError() { 917 void CastSocket::CloseWithError() {
988 DCHECK(CalledOnValidThread()); 918 DCHECK(CalledOnValidThread());
989 CloseInternal(); 919 CloseInternal();
990 RunPendingCallbacksOnClose(); 920 RunPendingCallbacksOnClose();
991 if (delegate_) { 921 if (delegate_) {
992 logger_->LogSocketEvent(channel_id_, proto::NOTIFY_ON_ERROR); 922 logger_->LogSocketEvent(channel_id_, proto::NOTIFY_ON_ERROR);
993 delegate_->OnError(this, error_state_, logger_->GetLastErrors(channel_id_)); 923 delegate_->OnError(this, error_state_, logger_->GetLastErrors(channel_id_));
994 } 924 }
995 } 925 }
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
1036 } 966 }
1037 } 967 }
1038 968
1039 void CastSocket::SetWriteState(WriteState write_state) { 969 void CastSocket::SetWriteState(WriteState write_state) {
1040 if (write_state_ != write_state) { 970 if (write_state_ != write_state) {
1041 write_state_ = write_state; 971 write_state_ = write_state;
1042 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_)); 972 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_));
1043 } 973 }
1044 } 974 }
1045 975
1046 CastSocket::MessageHeader::MessageHeader() : message_size(0) { } 976 PacketFramer::PacketFramer(scoped_refptr<net::GrowableIOBuffer> buffer)
977 : buffer_(buffer) {
978 Reset();
979 }
1047 980
1048 void CastSocket::MessageHeader::SetMessageSize(size_t size) { 981 PacketFramer::~PacketFramer() {
982 }
983
984 PacketFramer::MessageHeader::MessageHeader() : message_size(0) {
985 }
986
987 void PacketFramer::MessageHeader::SetMessageSize(size_t size) {
1049 DCHECK_LT(size, static_cast<size_t>(kuint32max)); 988 DCHECK_LT(size, static_cast<size_t>(kuint32max));
1050 DCHECK_GT(size, 0U); 989 DCHECK_GT(size, 0U);
1051 message_size = size; 990 message_size = size;
1052 } 991 }
1053 992
1054 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle, 993 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle,
1055 // if bit-for-bit compatible. 994 // if bit-for-bit compatible.
1056 void CastSocket::MessageHeader::PrependToString(std::string* str) { 995 void PacketFramer::MessageHeader::PrependToString(std::string* str) {
1057 MessageHeader output = *this; 996 MessageHeader output = *this;
1058 output.message_size = base::HostToNet32(message_size); 997 output.message_size = base::HostToNet32(message_size);
1059 size_t header_size = base::checked_cast<size_t, uint32>( 998 size_t header_size = base::checked_cast<size_t, uint32>(
1060 MessageHeader::header_size()); 999 MessageHeader::header_size());
1061 scoped_ptr<char, base::FreeDeleter> char_array( 1000 scoped_ptr<char, base::FreeDeleter> char_array(
1062 static_cast<char*>(malloc(header_size))); 1001 static_cast<char*>(malloc(header_size)));
1063 memcpy(char_array.get(), &output, header_size); 1002 memcpy(char_array.get(), &output, header_size);
1064 str->insert(0, char_array.get(), header_size); 1003 str->insert(0, char_array.get(), header_size);
1065 } 1004 }
1066 1005
1067 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle, 1006 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle,
1068 // if bit-for-bit compatible. 1007 // if bit-for-bit compatible.
1069 void CastSocket::MessageHeader::ReadFromIOBuffer( 1008 void PacketFramer::MessageHeader::Deserialize(char* data,
1070 net::GrowableIOBuffer* buffer, MessageHeader* header) { 1009 MessageHeader* header) {
1071 uint32 message_size; 1010 uint32 message_size;
1072 size_t header_size = base::checked_cast<size_t, uint32>( 1011 memcpy(&message_size, data, kHeaderSizeBytes);
1073 MessageHeader::header_size());
1074 memcpy(&message_size, buffer->StartOfBuffer(), header_size);
1075 header->message_size = base::NetToHost32(message_size); 1012 header->message_size = base::NetToHost32(message_size);
1076 } 1013 }
1077 1014
1078 std::string CastSocket::MessageHeader::ToString() { 1015 // static
1016 uint32 PacketFramer::MessageHeader::header_size() {
1017 return kHeaderSizeBytes;
1018 }
1019
1020 // static
1021 uint32 PacketFramer::MessageHeader::max_message_size() {
1022 return kMaxMessageSizeBytes;
1023 }
1024
1025 std::string PacketFramer::MessageHeader::ToString() {
1079 return "{message_size: " + base::UintToString(message_size) + "}"; 1026 return "{message_size: " + base::UintToString(message_size) + "}";
1080 } 1027 }
1081 1028
1082 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback) 1029 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback)
1083 : callback(callback) { } 1030 : callback(callback) { }
1084 1031
1085 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) { 1032 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) {
1086 DCHECK(!io_buffer.get()); 1033 DCHECK(!io_buffer.get());
1087 std::string message_data; 1034 std::string message_data;
1088 if (!Serialize(message_proto, &message_data)) 1035 if (!PacketFramer::Serialize(message_proto, &message_data)) {
1089 return false; 1036 return false;
1037 }
1090 message_namespace = message_proto.namespace_(); 1038 message_namespace = message_proto.namespace_();
1091 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data), 1039 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data),
1092 message_data.size()); 1040 message_data.size());
1093 return true; 1041 return true;
1094 } 1042 }
1095 1043
1096 CastSocket::WriteRequest::~WriteRequest() { } 1044 CastSocket::WriteRequest::~WriteRequest() { }
1097 1045
1046 // static
1047 bool PacketFramer::Serialize(const CastMessage& message_proto,
1048 std::string* message_data) {
1049 DCHECK(message_data);
1050 message_proto.SerializeToString(message_data);
1051 size_t message_size = message_data->size();
1052 if (message_size > kMaxMessageSizeBytes) {
1053 message_data->clear();
1054 return false;
1055 }
1056 MessageHeader header;
1057 header.SetMessageSize(message_size);
1058 header.PrependToString(message_data);
1059 return true;
1060 }
1061
1062 size_t PacketFramer::BytesRequested() {
1063 if (current_element_ == HEADER) {
1064 size_t bytes_left = kHeaderSizeBytes - packet_bytes_read_;
1065 VLOG(2) << "Bytes needed for header: " << bytes_left;
1066 return bytes_left;
1067 } else if (current_element_ == BODY) {
1068 size_t bytes_left = (message_size_ + kHeaderSizeBytes) - packet_bytes_read_;
1069 VLOG(2) << "Bytes needed for body: " << bytes_left;
1070 return bytes_left;
1071 } else {
1072 NOTREACHED() << "Unhandled packet element type.";
1073 return 0;
1074 }
1075 }
1076
1077 bool PacketFramer::Ingest(uint32 num_bytes,
1078 CastMessage* message,
mark a. foltz 2014/08/26 20:37:02 I feel like the message being read should be owned
Kevin M 2014/08/27 01:14:03 Done.
1079 size_t* message_length,
1080 ChannelError* error) {
1081 DCHECK_EQ(base::checked_cast<int32>(packet_bytes_read_), buffer_->offset());
1082 DCHECK(message);
1083 DCHECK(error);
1084 CHECK_LE(num_bytes, BytesRequested());
1085
1086 bool was_message_parsed = false;
1087 packet_bytes_read_ += num_bytes;
1088 *error = CHANNEL_ERROR_NONE;
1089 if (current_element_ == HEADER) {
1090 if (BytesRequested() == 0) {
1091 MessageHeader header;
1092 MessageHeader::Deserialize(buffer_.get()->StartOfBuffer(), &header);
1093 if (header.message_size > MessageHeader::max_message_size()) {
1094 VLOG(1) << "Error parsing header (message size too large).";
1095 *error = CHANNEL_ERROR_INVALID_MESSAGE;
1096 return false;
1097 }
1098 current_element_ = BODY;
1099 message_size_ = header.message_size;
1100 }
1101 } else if (current_element_ == BODY) {
1102 if (BytesRequested() == 0) {
1103 CastMessage parsed_message;
1104 if (!parsed_message.ParseFromArray(
1105 buffer_->StartOfBuffer() + kHeaderSizeBytes, message_size_)) {
1106 VLOG(1) << "Error parsing packet body.";
1107 *error = CHANNEL_ERROR_INVALID_MESSAGE;
1108 return false;
1109 }
1110 parsed_message.Swap(message);
1111 *message_length = message_size_;
1112 was_message_parsed = true;
1113 Reset();
1114 }
1115 }
1116
1117 buffer_->set_offset(packet_bytes_read_);
1118 return was_message_parsed;
1119 }
1120
1121 void PacketFramer::Reset() {
1122 current_element_ = HEADER;
1123 packet_bytes_read_ = 0;
1124 message_size_ = 0;
1125 buffer_->set_offset(0);
1126 }
1098 } // namespace cast_channel 1127 } // namespace cast_channel
1099 } // namespace core_api 1128 } // namespace core_api
1100 } // namespace extensions 1129 } // namespace extensions
1101 1130
1102 #undef VLOG_WITH_CONNECTION 1131 #undef VLOG_WITH_CONNECTION
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698