Index: extensions/browser/api/cast_channel/cast_socket.cc |
diff --git a/extensions/browser/api/cast_channel/cast_socket.cc b/extensions/browser/api/cast_channel/cast_socket.cc |
index dac6e0d395b84e113b6811912310f1f3c6219ba0..68455cc515bf6c41ed72a7c6faa3eef1de8a39ad 100644 |
--- a/extensions/browser/api/cast_channel/cast_socket.cc |
+++ b/extensions/browser/api/cast_channel/cast_socket.cc |
@@ -47,7 +47,10 @@ namespace { |
// after 9 failed probes. So the total idle time before close is 10 * |
// kTcpKeepAliveDelaySecs. |
const int kTcpKeepAliveDelaySecs = 10; |
- |
+// Size of a CastSocket header payload. |
+const size_t kHeaderSizeBytes = sizeof(int32); |
+// Maximum byte count for a CastSocket message. |
+const size_t kMaxMessageSizeBytes = 65536; |
} // namespace |
namespace extensions { |
@@ -189,13 +192,12 @@ CastSocket::CastSocket(const std::string& owner_extension_id, |
ip_endpoint_(ip_endpoint), |
channel_auth_(channel_auth), |
delegate_(delegate), |
- current_message_size_(0), |
- current_message_(new CastMessage()), |
net_log_(net_log), |
logger_(logger), |
connect_timeout_(timeout), |
connect_timeout_timer_(new base::OneShotTimer<CastSocket>), |
is_canceled_(false), |
+ current_message_(new CastMessage), |
connect_state_(CONN_STATE_NONE), |
write_state_(WRITE_STATE_NONE), |
read_state_(READ_STATE_NONE), |
@@ -207,12 +209,10 @@ CastSocket::CastSocket(const std::string& owner_extension_id, |
net_log_source_.type = net::NetLog::SOURCE_SOCKET; |
net_log_source_.id = net_log_->NextID(); |
- // Reuse these buffers for each message. |
- header_read_buffer_ = new net::GrowableIOBuffer(); |
- header_read_buffer_->SetCapacity(MessageHeader::header_size()); |
- body_read_buffer_ = new net::GrowableIOBuffer(); |
- body_read_buffer_->SetCapacity(MessageHeader::max_message_size()); |
- current_read_buffer_ = header_read_buffer_; |
+ // Buffer is reused across messages. |
+ read_buffer_ = new net::GrowableIOBuffer(); |
+ read_buffer_->SetCapacity(kMaxMessageSizeBytes); |
+ framer_.reset(new MessageFramer(read_buffer_)); |
} |
CastSocket::~CastSocket() { |
@@ -272,16 +272,18 @@ bool CastSocket::ExtractPeerCert(std::string* cert) { |
DCHECK(cert); |
DCHECK(peer_cert_.empty()); |
net::SSLInfo ssl_info; |
- if (!socket_->GetSSLInfo(&ssl_info) || !ssl_info.cert.get()) |
+ if (!socket_->GetSSLInfo(&ssl_info) || !ssl_info.cert.get()) { |
return false; |
+ } |
logger_->LogSocketEvent(channel_id_, proto::SSL_INFO_OBTAINED); |
bool result = net::X509Certificate::GetDEREncoded( |
ssl_info.cert->os_cert_handle(), cert); |
- if (result) |
+ if (result) { |
VLOG_WITH_CONNECTION(1) << "Successfully extracted peer certificate: " |
<< *cert; |
+ } |
logger_->LogSocketEventWithRv( |
channel_id_, proto::DER_ENCODED_CERT_OBTAIN, result ? 1 : 0); |
@@ -488,8 +490,9 @@ void CastSocket::DoAuthChallengeSendWriteComplete(int result) { |
int CastSocket::DoAuthChallengeSendComplete(int result) { |
VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result; |
- if (result < 0) |
+ if (result < 0) { |
return result; |
+ } |
SetConnectState(CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE); |
// Post a task to start read loop so that DoReadLoop is not nested inside |
@@ -502,10 +505,12 @@ int CastSocket::DoAuthChallengeSendComplete(int result) { |
int CastSocket::DoAuthChallengeReplyComplete(int result) { |
VLOG_WITH_CONNECTION(1) << "DoAuthChallengeReplyComplete: " << result; |
- if (result < 0) |
+ if (result < 0) { |
return result; |
- if (!VerifyChallengeReply()) |
+ } |
+ if (!VerifyChallengeReply()) { |
return net::ERR_FAILED; |
+ } |
VLOG_WITH_CONNECTION(1) << "Auth challenge verification succeeded"; |
return net::OK; |
} |
@@ -666,12 +671,14 @@ void CastSocket::DoWriteLoop(int result) { |
// If write loop is done because the queue is empty then set write |
// state to NONE |
- if (write_queue_.empty()) |
+ if (write_queue_.empty()) { |
SetWriteState(WRITE_STATE_NONE); |
+ } |
// Write loop is done - if the result is ERR_FAILED then close with error. |
- if (rv == net::ERR_FAILED) |
+ if (rv == net::ERR_FAILED) { |
CloseWithError(); |
+ } |
} |
int CastSocket::DoWrite() { |
@@ -705,10 +712,11 @@ int CastSocket::DoWriteComplete(int result) { |
WriteRequest& request = write_queue_.front(); |
scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; |
io_buffer->DidConsume(result); |
- if (io_buffer->BytesRemaining() == 0) // Message fully sent |
+ if (io_buffer->BytesRemaining() == 0) { // Message fully sent |
SetWriteState(WRITE_STATE_DO_CALLBACK); |
- else |
+ } else { |
SetWriteState(WRITE_STATE_WRITE); |
+ } |
return net::OK; |
} |
@@ -824,24 +832,14 @@ void CastSocket::DoReadLoop(int result) { |
int CastSocket::DoRead() { |
SetReadState(READ_STATE_READ_COMPLETE); |
- // Figure out whether to read header or body, and the remaining bytes. |
- uint32 num_bytes_to_read = 0; |
- if (header_read_buffer_->RemainingCapacity() > 0) { |
- current_read_buffer_ = header_read_buffer_; |
- num_bytes_to_read = header_read_buffer_->RemainingCapacity(); |
- CHECK_LE(num_bytes_to_read, MessageHeader::header_size()); |
- } else { |
- DCHECK_GT(current_message_size_, 0U); |
- num_bytes_to_read = current_message_size_ - body_read_buffer_->offset(); |
- current_read_buffer_ = body_read_buffer_; |
- CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size()); |
- } |
- CHECK_GT(num_bytes_to_read, 0U); |
+ |
+ // Determine how many bytes need to be read. |
+ size_t num_bytes_to_read = framer_->BytesRequested(); |
// Read up to num_bytes_to_read into |current_read_buffer_|. |
int rv = socket_->Read( |
- current_read_buffer_.get(), |
- num_bytes_to_read, |
+ read_buffer_.get(), |
+ base::checked_cast<uint32>(num_bytes_to_read), |
base::Bind(&CastSocket::DoReadLoop, base::Unretained(this))); |
logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_READ, rv); |
@@ -849,10 +847,8 @@ int CastSocket::DoRead() { |
} |
int CastSocket::DoReadComplete(int result) { |
- VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result |
- << " header offset = " |
- << header_read_buffer_->offset() |
- << " body offset = " << body_read_buffer_->offset(); |
+ VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result; |
+ |
if (result <= 0) { // 0 means EOF: the peer closed the socket |
VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket"; |
SetErrorState(CHANNEL_ERROR_SOCKET_ERROR); |
@@ -860,44 +856,20 @@ int CastSocket::DoReadComplete(int result) { |
return result == 0 ? net::ERR_FAILED : result; |
} |
- // Some data was read. Move the offset in the current buffer forward. |
- CHECK_LE(current_read_buffer_->offset() + result, |
- current_read_buffer_->capacity()); |
- current_read_buffer_->set_offset(current_read_buffer_->offset() + result); |
- |
- if (current_read_buffer_.get() == header_read_buffer_.get() && |
- current_read_buffer_->RemainingCapacity() == 0) { |
- // A full header is read, process the contents. |
- if (!ProcessHeader()) { |
- SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE); |
- SetReadState(READ_STATE_ERROR); |
- } else { |
- // Processed header, now read the body. |
- SetReadState(READ_STATE_READ); |
- } |
- } else if (current_read_buffer_.get() == body_read_buffer_.get() && |
- static_cast<uint32>(current_read_buffer_->offset()) == |
- current_message_size_) { |
- // Store a copy of current_message_size_ since it will be reset by |
- // ProcessBody(). |
- uint32 message_size = current_message_size_; |
- // Full body is read, process the contents. |
- if (ProcessBody()) { |
- logger_->LogSocketEventForMessage( |
- channel_id_, |
- proto::MESSAGE_READ, |
- current_message_->namespace_(), |
- base::StringPrintf("Message size: %u", message_size)); |
- SetReadState(READ_STATE_DO_CALLBACK); |
- } else { |
- SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE); |
- SetReadState(READ_STATE_ERROR); |
- } |
+ size_t message_size; |
+ current_message_ = framer_->Ingest(result, &message_size, &error_state_); |
mark a. foltz
2014/08/28 21:45:21
Before calling this DCHECK that current_message_ i
Kevin M
2014/08/28 23:33:32
Done.
|
+ if (current_message_.get()) { |
+ logger_->LogSocketEventForMessage( |
mark a. foltz
2014/08/28 21:45:21
DCHECK that error_state_ and message_size_ are set
Kevin M
2014/08/28 23:33:31
The size is computed within the framer itself - ar
|
+ channel_id_, |
+ proto::MESSAGE_READ, |
+ current_message_->namespace_(), |
+ base::StringPrintf("Message size: %zu", message_size)); |
+ SetReadState(READ_STATE_DO_CALLBACK); |
+ } else if (error_state_ != CHANNEL_ERROR_NONE) { |
+ SetReadState(READ_STATE_ERROR); |
mark a. foltz
2014/08/28 21:45:22
DCHECK that current_message_ is NULL.
Kevin M
2014/08/28 23:33:32
This is guaranteed by the "else" clause of the fir
|
} else { |
- // Have not received full header or full body yet; keep reading. |
SetReadState(READ_STATE_READ); |
} |
- |
return net::OK; |
} |
@@ -940,49 +912,7 @@ int CastSocket::DoReadError(int result) { |
return net::ERR_FAILED; |
} |
-bool CastSocket::ProcessHeader() { |
- CHECK_EQ(static_cast<uint32>(header_read_buffer_->offset()), |
- MessageHeader::header_size()); |
- MessageHeader header; |
- MessageHeader::ReadFromIOBuffer(header_read_buffer_.get(), &header); |
- if (header.message_size > MessageHeader::max_message_size()) |
- return false; |
- |
- VLOG_WITH_CONNECTION(2) << "Parsed header { message_size: " |
- << header.message_size << " }"; |
- current_message_size_ = header.message_size; |
- return true; |
-} |
-bool CastSocket::ProcessBody() { |
- CHECK_EQ(static_cast<uint32>(body_read_buffer_->offset()), |
- current_message_size_); |
- if (!current_message_->ParseFromArray( |
- body_read_buffer_->StartOfBuffer(), current_message_size_)) { |
- return false; |
- } |
- current_message_size_ = 0; |
- header_read_buffer_->set_offset(0); |
- body_read_buffer_->set_offset(0); |
- current_read_buffer_ = header_read_buffer_; |
- return true; |
-} |
- |
-// static |
-bool CastSocket::Serialize(const CastMessage& message_proto, |
- std::string* message_data) { |
- DCHECK(message_data); |
- message_proto.SerializeToString(message_data); |
- size_t message_size = message_data->size(); |
- if (message_size > MessageHeader::max_message_size()) { |
- message_data->clear(); |
- return false; |
- } |
- CastSocket::MessageHeader header; |
- header.SetMessageSize(message_size); |
- header.PrependToString(message_data); |
- return true; |
-} |
void CastSocket::CloseWithError() { |
DCHECK(CalledOnValidThread()); |
@@ -1043,9 +973,18 @@ void CastSocket::SetWriteState(WriteState write_state) { |
} |
} |
-CastSocket::MessageHeader::MessageHeader() : message_size(0) { } |
+MessageFramer::MessageFramer(scoped_refptr<net::GrowableIOBuffer> input_buffer) |
+ : input_buffer_(input_buffer) { |
+ Reset(); |
+} |
+ |
+MessageFramer::~MessageFramer() { |
+} |
+ |
+MessageFramer::MessageHeader::MessageHeader() : message_size(0) { |
+} |
-void CastSocket::MessageHeader::SetMessageSize(size_t size) { |
+void MessageFramer::MessageHeader::SetMessageSize(size_t size) { |
DCHECK_LT(size, static_cast<size_t>(kuint32max)); |
DCHECK_GT(size, 0U); |
message_size = size; |
@@ -1053,11 +992,10 @@ void CastSocket::MessageHeader::SetMessageSize(size_t size) { |
// TODO(mfoltz): Investigate replacing header serialization with base::Pickle, |
// if bit-for-bit compatible. |
-void CastSocket::MessageHeader::PrependToString(std::string* str) { |
+void MessageFramer::MessageHeader::PrependToString(std::string* str) { |
MessageHeader output = *this; |
output.message_size = base::HostToNet32(message_size); |
- size_t header_size = base::checked_cast<size_t, uint32>( |
- MessageHeader::header_size()); |
+ size_t header_size = MessageHeader::header_size(); |
scoped_ptr<char, base::FreeDeleter> char_array( |
static_cast<char*>(malloc(header_size))); |
memcpy(char_array.get(), &output, header_size); |
@@ -1066,16 +1004,25 @@ void CastSocket::MessageHeader::PrependToString(std::string* str) { |
// TODO(mfoltz): Investigate replacing header deserialization with base::Pickle, |
// if bit-for-bit compatible. |
-void CastSocket::MessageHeader::ReadFromIOBuffer( |
- net::GrowableIOBuffer* buffer, MessageHeader* header) { |
+void MessageFramer::MessageHeader::Deserialize(char* data, |
+ MessageHeader* header) { |
uint32 message_size; |
- size_t header_size = base::checked_cast<size_t, uint32>( |
- MessageHeader::header_size()); |
- memcpy(&message_size, buffer->StartOfBuffer(), header_size); |
- header->message_size = base::NetToHost32(message_size); |
+ memcpy(&message_size, data, kHeaderSizeBytes); |
+ header->message_size = |
+ base::checked_cast<size_t>(base::NetToHost32(message_size)); |
} |
-std::string CastSocket::MessageHeader::ToString() { |
+// static |
+size_t MessageFramer::MessageHeader::header_size() { |
+ return kHeaderSizeBytes; |
+} |
+ |
+// static |
+size_t MessageFramer::MessageHeader::max_message_size() { |
+ return kMaxMessageSizeBytes; |
+} |
+ |
+std::string MessageFramer::MessageHeader::ToString() { |
return "{message_size: " + base::UintToString(message_size) + "}"; |
} |
@@ -1085,8 +1032,9 @@ CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback) |
bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) { |
DCHECK(!io_buffer.get()); |
std::string message_data; |
- if (!Serialize(message_proto, &message_data)) |
+ if (!MessageFramer::Serialize(message_proto, &message_data)) { |
return false; |
+ } |
message_namespace = message_proto.namespace_(); |
io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data), |
message_data.size()); |
@@ -1095,6 +1043,84 @@ bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) { |
CastSocket::WriteRequest::~WriteRequest() { } |
+// static |
+bool MessageFramer::Serialize(const CastMessage& message_proto, |
+ std::string* message_data) { |
+ DCHECK(message_data); |
+ message_proto.SerializeToString(message_data); |
+ size_t message_size = message_data->size(); |
+ if (message_size > kMaxMessageSizeBytes) { |
+ message_data->clear(); |
+ return false; |
+ } |
+ MessageHeader header; |
+ header.SetMessageSize(message_size); |
+ header.PrependToString(message_data); |
+ return true; |
+} |
+ |
+size_t MessageFramer::BytesRequested() { |
+ if (current_element_ == HEADER) { |
mark a. foltz
2014/08/28 21:45:22
switch statement?
Kevin M
2014/08/28 23:33:31
Done.
|
+ size_t bytes_left = kHeaderSizeBytes - packet_bytes_read_; |
+ VLOG(2) << "Bytes needed for header: " << bytes_left; |
+ return bytes_left; |
mark a. foltz
2014/08/28 21:45:22
DCHECK(bytes_left <= kHeaderSizeBytes)
Kevin M
2014/08/28 23:33:32
Done.
|
+ } else if (current_element_ == BODY) { |
+ size_t bytes_left = (body_size_ + kHeaderSizeBytes) - packet_bytes_read_; |
+ VLOG(2) << "Bytes needed for body: " << bytes_left; |
mark a. foltz
2014/08/28 21:45:22
DCHECK(bytes_left <= kMessageSizeBytes - kHeaderSi
Kevin M
2014/08/28 23:33:32
Done.
|
+ return bytes_left; |
+ } else { |
+ NOTREACHED() << "Unhandled packet element type."; |
+ return 0; |
+ } |
+} |
+ |
+scoped_ptr<CastMessage> MessageFramer::Ingest(size_t num_bytes, |
+ size_t* message_length, |
+ ChannelError* error) { |
+ DCHECK_EQ(base::checked_cast<int32>(packet_bytes_read_), |
+ input_buffer_->offset()); |
+ DCHECK(error); |
+ CHECK_LE(num_bytes, BytesRequested()); |
mark a. foltz
2014/08/28 21:45:21
DCHECK(message_length)
Kevin M
2014/08/28 23:33:32
Done.
|
+ |
+ packet_bytes_read_ += num_bytes; |
+ *error = CHANNEL_ERROR_NONE; |
+ if (current_element_ == HEADER) { |
mark a. foltz
2014/08/28 21:45:21
switch statement with a default case of NOTREACHED
Kevin M
2014/08/28 23:33:31
Done.
|
+ if (BytesRequested() == 0) { |
+ MessageHeader header; |
+ MessageHeader::Deserialize(input_buffer_.get()->StartOfBuffer(), &header); |
+ if (header.message_size > MessageHeader::max_message_size()) { |
+ VLOG(1) << "Error parsing header (message size too large)."; |
+ *error = CHANNEL_ERROR_INVALID_MESSAGE; |
+ return scoped_ptr<CastMessage>(); |
+ } |
+ current_element_ = BODY; |
+ body_size_ = header.message_size; |
+ } |
+ } else if (current_element_ == BODY) { |
+ if (BytesRequested() == 0) { |
+ scoped_ptr<CastMessage> parsed_message(new CastMessage); |
+ if (!parsed_message->ParseFromArray( |
+ input_buffer_->StartOfBuffer() + kHeaderSizeBytes, body_size_)) { |
+ VLOG(1) << "Error parsing packet body."; |
+ *error = CHANNEL_ERROR_INVALID_MESSAGE; |
+ return scoped_ptr<CastMessage>(); |
mark a. foltz
2014/08/28 21:45:21
It would seem safest to put the framer into a term
Kevin M
2014/08/28 23:33:32
Done.
|
+ } |
+ *message_length = body_size_; |
+ Reset(); |
+ return parsed_message.Pass(); |
+ } |
+ } |
+ |
+ input_buffer_->set_offset(packet_bytes_read_); |
+ return scoped_ptr<CastMessage>(); |
+} |
+ |
+void MessageFramer::Reset() { |
+ current_element_ = HEADER; |
+ packet_bytes_read_ = 0; |
+ body_size_ = 0; |
+ input_buffer_->set_offset(0); |
+} |
mark a. foltz
2014/08/28 21:45:22
Nit: add empty line after this definition.
Kevin M
2014/08/28 23:33:31
Done.
|
} // namespace cast_channel |
} // namespace core_api |
} // namespace extensions |