Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8)

Unified Diff: extensions/browser/api/cast_channel/cast_transport.cc

Issue 2926313002: Revert of [cast_channel] Move cast_channel related files from //extensions to //components (Closed)
Patch Set: Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: extensions/browser/api/cast_channel/cast_transport.cc
diff --git a/extensions/browser/api/cast_channel/cast_transport.cc b/extensions/browser/api/cast_channel/cast_transport.cc
new file mode 100644
index 0000000000000000000000000000000000000000..12168f42cc08302b342be5976cee0b79634bb4ea
--- /dev/null
+++ b/extensions/browser/api/cast_channel/cast_transport.cc
@@ -0,0 +1,455 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "extensions/browser/api/cast_channel/cast_transport.h"
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <string>
+#include <utility>
+
+#include "base/bind.h"
+#include "base/format_macros.h"
+#include "base/location.h"
+#include "base/numerics/safe_conversions.h"
+#include "base/single_thread_task_runner.h"
+#include "base/strings/stringprintf.h"
+#include "base/threading/thread_task_runner_handle.h"
+#include "extensions/browser/api/cast_channel/cast_framer.h"
+#include "extensions/browser/api/cast_channel/cast_message_util.h"
+#include "extensions/browser/api/cast_channel/logger.h"
+#include "extensions/common/api/cast_channel/cast_channel.pb.h"
+#include "net/base/net_errors.h"
+#include "net/socket/socket.h"
+
+#define VLOG_WITH_CONNECTION(level) \
+ VLOG(level) << "[" << ip_endpoint_.ToString() << ", auth=" \
+ << ::cast_channel::ChannelAuthTypeToString(channel_auth_) \
+ << "] "
+
+namespace extensions {
+namespace api {
+namespace cast_channel {
+
+CastTransportImpl::CastTransportImpl(net::Socket* socket,
+ int channel_id,
+ const net::IPEndPoint& ip_endpoint,
+ ChannelAuthType channel_auth,
+ scoped_refptr<Logger> logger)
+ : started_(false),
+ socket_(socket),
+ write_state_(WRITE_STATE_IDLE),
+ read_state_(READ_STATE_READ),
+ error_state_(ChannelError::NONE),
+ channel_id_(channel_id),
+ ip_endpoint_(ip_endpoint),
+ channel_auth_(channel_auth),
+ logger_(logger) {
+ DCHECK(socket);
+
+ // Buffer is reused across messages to minimize unnecessary buffer
+ // [re]allocations.
+ read_buffer_ = new net::GrowableIOBuffer();
+ read_buffer_->SetCapacity(MessageFramer::MessageHeader::max_message_size());
+ framer_.reset(new MessageFramer(read_buffer_));
+}
+
+CastTransportImpl::~CastTransportImpl() {
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+ FlushWriteQueue();
+}
+
+bool CastTransportImpl::IsTerminalWriteState(
+ CastTransportImpl::WriteState write_state) {
+ return write_state == WRITE_STATE_ERROR || write_state == WRITE_STATE_IDLE;
+}
+
+bool CastTransportImpl::IsTerminalReadState(
+ CastTransportImpl::ReadState read_state) {
+ return read_state == READ_STATE_ERROR;
+}
+
+// static
+proto::ReadState CastTransportImpl::ReadStateToProto(
+ CastTransportImpl::ReadState state) {
+ switch (state) {
+ case CastTransportImpl::READ_STATE_UNKNOWN:
+ return proto::READ_STATE_UNKNOWN;
+ case CastTransportImpl::READ_STATE_READ:
+ return proto::READ_STATE_READ;
+ case CastTransportImpl::READ_STATE_READ_COMPLETE:
+ return proto::READ_STATE_READ_COMPLETE;
+ case CastTransportImpl::READ_STATE_DO_CALLBACK:
+ return proto::READ_STATE_DO_CALLBACK;
+ case CastTransportImpl::READ_STATE_HANDLE_ERROR:
+ return proto::READ_STATE_HANDLE_ERROR;
+ case CastTransportImpl::READ_STATE_ERROR:
+ return proto::READ_STATE_ERROR;
+ default:
+ NOTREACHED();
+ return proto::READ_STATE_UNKNOWN;
+ }
+}
+
+// static
+proto::WriteState CastTransportImpl::WriteStateToProto(
+ CastTransportImpl::WriteState state) {
+ switch (state) {
+ case CastTransportImpl::WRITE_STATE_IDLE:
+ return proto::WRITE_STATE_IDLE;
+ case CastTransportImpl::WRITE_STATE_UNKNOWN:
+ return proto::WRITE_STATE_UNKNOWN;
+ case CastTransportImpl::WRITE_STATE_WRITE:
+ return proto::WRITE_STATE_WRITE;
+ case CastTransportImpl::WRITE_STATE_WRITE_COMPLETE:
+ return proto::WRITE_STATE_WRITE_COMPLETE;
+ case CastTransportImpl::WRITE_STATE_DO_CALLBACK:
+ return proto::WRITE_STATE_DO_CALLBACK;
+ case CastTransportImpl::WRITE_STATE_HANDLE_ERROR:
+ return proto::WRITE_STATE_HANDLE_ERROR;
+ case CastTransportImpl::WRITE_STATE_ERROR:
+ return proto::WRITE_STATE_ERROR;
+ default:
+ NOTREACHED();
+ return proto::WRITE_STATE_UNKNOWN;
+ }
+}
+
+// static
+proto::ErrorState CastTransportImpl::ErrorStateToProto(ChannelError state) {
+ switch (state) {
+ case ChannelError::NONE:
+ return proto::CHANNEL_ERROR_NONE;
+ case ChannelError::CHANNEL_NOT_OPEN:
+ return proto::CHANNEL_ERROR_CHANNEL_NOT_OPEN;
+ case ChannelError::AUTHENTICATION_ERROR:
+ return proto::CHANNEL_ERROR_AUTHENTICATION_ERROR;
+ case ChannelError::CONNECT_ERROR:
+ return proto::CHANNEL_ERROR_CONNECT_ERROR;
+ case ChannelError::CAST_SOCKET_ERROR:
+ return proto::CHANNEL_ERROR_SOCKET_ERROR;
+ case ChannelError::TRANSPORT_ERROR:
+ return proto::CHANNEL_ERROR_TRANSPORT_ERROR;
+ case ChannelError::INVALID_MESSAGE:
+ return proto::CHANNEL_ERROR_INVALID_MESSAGE;
+ case ChannelError::INVALID_CHANNEL_ID:
+ return proto::CHANNEL_ERROR_INVALID_CHANNEL_ID;
+ case ChannelError::CONNECT_TIMEOUT:
+ return proto::CHANNEL_ERROR_CONNECT_TIMEOUT;
+ case ChannelError::UNKNOWN:
+ return proto::CHANNEL_ERROR_UNKNOWN;
+ default:
+ NOTREACHED();
+ return proto::CHANNEL_ERROR_NONE;
+ }
+}
+
+void CastTransportImpl::SetReadDelegate(std::unique_ptr<Delegate> delegate) {
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+ DCHECK(delegate);
+ delegate_ = std::move(delegate);
+ if (started_) {
+ delegate_->Start();
+ }
+}
+
+void CastTransportImpl::FlushWriteQueue() {
+ for (; !write_queue_.empty(); write_queue_.pop()) {
+ net::CompletionCallback& callback = write_queue_.front().callback;
+ base::ThreadTaskRunnerHandle::Get()->PostTask(
+ FROM_HERE, base::Bind(callback, net::ERR_FAILED));
+ callback.Reset();
+ }
+}
+
+void CastTransportImpl::SendMessage(const CastMessage& message,
+ const net::CompletionCallback& callback) {
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+ std::string serialized_message;
+ if (!MessageFramer::Serialize(message, &serialized_message)) {
+ base::ThreadTaskRunnerHandle::Get()->PostTask(
+ FROM_HERE, base::Bind(callback, net::ERR_FAILED));
+ return;
+ }
+ WriteRequest write_request(
+ message.namespace_(), serialized_message, callback);
+
+ write_queue_.push(write_request);
+ if (write_state_ == WRITE_STATE_IDLE) {
+ SetWriteState(WRITE_STATE_WRITE);
+ OnWriteResult(net::OK);
+ }
+}
+
+CastTransportImpl::WriteRequest::WriteRequest(
+ const std::string& namespace_,
+ const std::string& payload,
+ const net::CompletionCallback& callback)
+ : message_namespace(namespace_), callback(callback) {
+ VLOG(2) << "WriteRequest size: " << payload.size();
+ io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(payload),
+ payload.size());
+}
+
+CastTransportImpl::WriteRequest::WriteRequest(const WriteRequest& other) =
+ default;
+
+CastTransportImpl::WriteRequest::~WriteRequest() {
+}
+
+void CastTransportImpl::SetReadState(ReadState read_state) {
+ if (read_state_ != read_state)
+ read_state_ = read_state;
+}
+
+void CastTransportImpl::SetWriteState(WriteState write_state) {
+ if (write_state_ != write_state)
+ write_state_ = write_state;
+}
+
+void CastTransportImpl::SetErrorState(ChannelError error_state) {
+ VLOG_WITH_CONNECTION(2) << "SetErrorState: "
+ << ::cast_channel::ChannelErrorToString(error_state);
+ error_state_ = error_state;
+}
+
+void CastTransportImpl::OnWriteResult(int result) {
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+ DCHECK_NE(WRITE_STATE_IDLE, write_state_);
+ if (write_queue_.empty()) {
+ SetWriteState(WRITE_STATE_IDLE);
+ return;
+ }
+
+ // Network operations can either finish synchronously or asynchronously.
+ // This method executes the state machine transitions in a loop so that
+ // write state transitions happen even when network operations finish
+ // synchronously.
+ int rv = result;
+ do {
+ VLOG_WITH_CONNECTION(2) << "OnWriteResult (state=" << write_state_ << ", "
+ << "result=" << rv << ", "
+ << "queue size=" << write_queue_.size() << ")";
+
+ WriteState state = write_state_;
+ write_state_ = WRITE_STATE_UNKNOWN;
+ switch (state) {
+ case WRITE_STATE_WRITE:
+ rv = DoWrite();
+ break;
+ case WRITE_STATE_WRITE_COMPLETE:
+ rv = DoWriteComplete(rv);
+ break;
+ case WRITE_STATE_DO_CALLBACK:
+ rv = DoWriteCallback();
+ break;
+ case WRITE_STATE_HANDLE_ERROR:
+ rv = DoWriteHandleError(rv);
+ DCHECK_EQ(WRITE_STATE_ERROR, write_state_);
+ break;
+ default:
+ NOTREACHED() << "Unknown state in write state machine: " << state;
+ SetWriteState(WRITE_STATE_ERROR);
+ SetErrorState(ChannelError::UNKNOWN);
+ rv = net::ERR_FAILED;
+ break;
+ }
+ } while (rv != net::ERR_IO_PENDING && !IsTerminalWriteState(write_state_));
+
+ if (write_state_ == WRITE_STATE_ERROR) {
+ FlushWriteQueue();
+ DCHECK_NE(ChannelError::NONE, error_state_);
+ VLOG_WITH_CONNECTION(2) << "Sending OnError().";
+ delegate_->OnError(error_state_);
+ }
+}
+
+int CastTransportImpl::DoWrite() {
+ DCHECK(!write_queue_.empty());
+ WriteRequest& request = write_queue_.front();
+
+ VLOG_WITH_CONNECTION(2) << "WriteData byte_count = "
+ << request.io_buffer->size() << " bytes_written "
+ << request.io_buffer->BytesConsumed();
+
+ SetWriteState(WRITE_STATE_WRITE_COMPLETE);
+
+ int rv = socket_->Write(
+ request.io_buffer.get(), request.io_buffer->BytesRemaining(),
+ base::Bind(&CastTransportImpl::OnWriteResult, base::Unretained(this)));
+ return rv;
+}
+
+int CastTransportImpl::DoWriteComplete(int result) {
+ VLOG_WITH_CONNECTION(2) << "DoWriteComplete result=" << result;
+ DCHECK(!write_queue_.empty());
+ if (result <= 0) { // NOTE that 0 also indicates an error
+ logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_WRITE, result);
+ SetErrorState(ChannelError::CAST_SOCKET_ERROR);
+ SetWriteState(WRITE_STATE_HANDLE_ERROR);
+ return result == 0 ? net::ERR_FAILED : result;
+ }
+
+ // Some bytes were successfully written
+ WriteRequest& request = write_queue_.front();
+ scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer;
+ io_buffer->DidConsume(result);
+ if (io_buffer->BytesRemaining() == 0) { // Message fully sent
+ SetWriteState(WRITE_STATE_DO_CALLBACK);
+ } else {
+ SetWriteState(WRITE_STATE_WRITE);
+ }
+
+ return net::OK;
+}
+
+int CastTransportImpl::DoWriteCallback() {
+ VLOG_WITH_CONNECTION(2) << "DoWriteCallback";
+ DCHECK(!write_queue_.empty());
+
+ WriteRequest& request = write_queue_.front();
+ base::ThreadTaskRunnerHandle::Get()->PostTask(
+ FROM_HERE, base::Bind(request.callback, net::OK));
+
+ write_queue_.pop();
+ if (write_queue_.empty()) {
+ SetWriteState(WRITE_STATE_IDLE);
+ } else {
+ SetWriteState(WRITE_STATE_WRITE);
+ }
+
+ return net::OK;
+}
+
+int CastTransportImpl::DoWriteHandleError(int result) {
+ VLOG_WITH_CONNECTION(2) << "DoWriteHandleError result=" << result;
+ DCHECK_NE(ChannelError::NONE, error_state_);
+ DCHECK_LT(result, 0);
+ SetWriteState(WRITE_STATE_ERROR);
+ return net::ERR_FAILED;
+}
+
+void CastTransportImpl::Start() {
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+ DCHECK(!started_);
+ DCHECK_EQ(READ_STATE_READ, read_state_);
+ DCHECK(delegate_) << "Read delegate must be set prior to calling Start()";
+ started_ = true;
+ delegate_->Start();
+ SetReadState(READ_STATE_READ);
+
+ // Start the read state machine.
+ OnReadResult(net::OK);
+}
+
+void CastTransportImpl::OnReadResult(int result) {
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+ // Network operations can either finish synchronously or asynchronously.
+ // This method executes the state machine transitions in a loop so that
+ // write state transitions happen even when network operations finish
+ // synchronously.
+ int rv = result;
+ do {
+ VLOG_WITH_CONNECTION(2) << "OnReadResult(state=" << read_state_
+ << ", result=" << rv << ")";
+ ReadState state = read_state_;
+ read_state_ = READ_STATE_UNKNOWN;
+
+ switch (state) {
+ case READ_STATE_READ:
+ rv = DoRead();
+ break;
+ case READ_STATE_READ_COMPLETE:
+ rv = DoReadComplete(rv);
+ break;
+ case READ_STATE_DO_CALLBACK:
+ rv = DoReadCallback();
+ break;
+ case READ_STATE_HANDLE_ERROR:
+ rv = DoReadHandleError(rv);
+ DCHECK_EQ(read_state_, READ_STATE_ERROR);
+ break;
+ default:
+ NOTREACHED() << "Unknown state in read state machine: " << state;
+ SetReadState(READ_STATE_ERROR);
+ SetErrorState(ChannelError::UNKNOWN);
+ rv = net::ERR_FAILED;
+ break;
+ }
+ } while (rv != net::ERR_IO_PENDING && !IsTerminalReadState(read_state_));
+
+ if (IsTerminalReadState(read_state_)) {
+ DCHECK_EQ(READ_STATE_ERROR, read_state_);
+ VLOG_WITH_CONNECTION(2) << "Sending OnError().";
+ delegate_->OnError(error_state_);
+ }
+}
+
+int CastTransportImpl::DoRead() {
+ VLOG_WITH_CONNECTION(2) << "DoRead";
+ SetReadState(READ_STATE_READ_COMPLETE);
+
+ // Determine how many bytes need to be read.
+ size_t num_bytes_to_read = framer_->BytesRequested();
+ DCHECK_GT(num_bytes_to_read, 0u);
+
+ // Read up to num_bytes_to_read into |current_read_buffer_|.
+ return socket_->Read(
+ read_buffer_.get(), base::checked_cast<uint32_t>(num_bytes_to_read),
+ base::Bind(&CastTransportImpl::OnReadResult, base::Unretained(this)));
+}
+
+int CastTransportImpl::DoReadComplete(int result) {
+ VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result;
+ if (result <= 0) {
+ logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_READ, result);
+ VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket.";
+ SetErrorState(ChannelError::CAST_SOCKET_ERROR);
+ SetReadState(READ_STATE_HANDLE_ERROR);
+ return result == 0 ? net::ERR_FAILED : result;
+ }
+
+ size_t message_size;
+ DCHECK(!current_message_);
+ ChannelError framing_error;
+ current_message_ = framer_->Ingest(result, &message_size, &framing_error);
+ if (current_message_.get() && (framing_error == ChannelError::NONE)) {
+ DCHECK_GT(message_size, static_cast<size_t>(0));
+ SetReadState(READ_STATE_DO_CALLBACK);
+ } else if (framing_error != ChannelError::NONE) {
+ DCHECK(!current_message_);
+ SetErrorState(ChannelError::INVALID_MESSAGE);
+ SetReadState(READ_STATE_HANDLE_ERROR);
+ } else {
+ DCHECK(!current_message_);
+ SetReadState(READ_STATE_READ);
+ }
+ return net::OK;
+}
+
+int CastTransportImpl::DoReadCallback() {
+ VLOG_WITH_CONNECTION(2) << "DoReadCallback";
+ if (!IsCastMessageValid(*current_message_)) {
+ SetReadState(READ_STATE_HANDLE_ERROR);
+ SetErrorState(ChannelError::INVALID_MESSAGE);
+ return net::ERR_INVALID_RESPONSE;
+ }
+ SetReadState(READ_STATE_READ);
+ delegate_->OnMessage(*current_message_);
+ current_message_.reset();
+ return net::OK;
+}
+
+int CastTransportImpl::DoReadHandleError(int result) {
+ VLOG_WITH_CONNECTION(2) << "DoReadHandleError";
+ DCHECK_NE(ChannelError::NONE, error_state_);
+ DCHECK_LE(result, 0);
+ SetReadState(READ_STATE_ERROR);
+ return net::ERR_FAILED;
+}
+
+} // namespace cast_channel
+} // namespace api
+} // namespace extensions
« no previous file with comments | « extensions/browser/api/cast_channel/cast_transport.h ('k') | extensions/browser/api/cast_channel/cast_transport_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698