| Index: content/browser/renderer_host/websocket_host.cc
|
| diff --git a/content/browser/renderer_host/websocket_host.cc b/content/browser/renderer_host/websocket_host.cc
|
| index d0715889db733e6a55e4187fb5752372b909f85d..4a291a0b51e3103d425e6e79ce202def4dd42428 100644
|
| --- a/content/browser/renderer_host/websocket_host.cc
|
| +++ b/content/browser/renderer_host/websocket_host.cc
|
| @@ -17,6 +17,7 @@
|
| #include "base/strings/stringprintf.h"
|
| #include "base/thread_task_runner_handle.h"
|
| #include "content/browser/bad_message.h"
|
| +#include "content/browser/renderer_host/websocket_blob_receiver.h"
|
| #include "content/browser/renderer_host/websocket_blob_sender.h"
|
| #include "content/browser/renderer_host/websocket_dispatcher_host.h"
|
| #include "content/browser/ssl/ssl_error_handler.h"
|
| @@ -33,7 +34,6 @@
|
| #include "net/ssl/ssl_info.h"
|
| #include "net/websockets/websocket_channel.h"
|
| #include "net/websockets/websocket_errors.h"
|
| -#include "net/websockets/websocket_event_interface.h"
|
| #include "net/websockets/websocket_frame.h" // for WebSocketFrameHeader::OpCode
|
| #include "net/websockets/websocket_handshake_request_info.h"
|
| #include "net/websockets/websocket_handshake_response_info.h"
|
| @@ -43,55 +43,56 @@ namespace content {
|
|
|
| namespace {
|
|
|
| -typedef net::WebSocketEventInterface::ChannelState ChannelState;
|
| +using net::WebSocketEventInterface;
|
| +using net::WebSocketFrameHeader;
|
| +using ChannelState = WebSocketEventInterface::ChannelState;
|
| +using OpCode = WebSocketFrameHeader::OpCode;
|
| +
|
| +const auto CHANNEL_ALIVE = WebSocketEventInterface::CHANNEL_ALIVE;
|
| +const auto CHANNEL_DELETED = WebSocketEventInterface::CHANNEL_DELETED;
|
| +const auto WEBSOCKET_HOST_ALIVE = WebSocketDispatcherHost::WEBSOCKET_HOST_ALIVE;
|
| +const auto WEBSOCKET_HOST_DELETED =
|
| + WebSocketDispatcherHost::WEBSOCKET_HOST_DELETED;
|
|
|
| // Convert a content::WebSocketMessageType to a
|
| -// net::WebSocketFrameHeader::OpCode
|
| -net::WebSocketFrameHeader::OpCode MessageTypeToOpCode(
|
| - WebSocketMessageType type) {
|
| +// WebSocketFrameHeader::OpCode
|
| +OpCode MessageTypeToOpCode(WebSocketMessageType type) {
|
| DCHECK(type == WEB_SOCKET_MESSAGE_TYPE_CONTINUATION ||
|
| type == WEB_SOCKET_MESSAGE_TYPE_TEXT ||
|
| type == WEB_SOCKET_MESSAGE_TYPE_BINARY);
|
| - typedef net::WebSocketFrameHeader::OpCode OpCode;
|
| // These compile asserts verify that the same underlying values are used for
|
| // both types, so we can simply cast between them.
|
| static_assert(static_cast<OpCode>(WEB_SOCKET_MESSAGE_TYPE_CONTINUATION) ==
|
| - net::WebSocketFrameHeader::kOpCodeContinuation,
|
| + WebSocketFrameHeader::kOpCodeContinuation,
|
| "enum values must match for opcode continuation");
|
| static_assert(static_cast<OpCode>(WEB_SOCKET_MESSAGE_TYPE_TEXT) ==
|
| - net::WebSocketFrameHeader::kOpCodeText,
|
| + WebSocketFrameHeader::kOpCodeText,
|
| "enum values must match for opcode text");
|
| static_assert(static_cast<OpCode>(WEB_SOCKET_MESSAGE_TYPE_BINARY) ==
|
| - net::WebSocketFrameHeader::kOpCodeBinary,
|
| + WebSocketFrameHeader::kOpCodeBinary,
|
| "enum values must match for opcode binary");
|
| return static_cast<OpCode>(type);
|
| }
|
|
|
| -WebSocketMessageType OpCodeToMessageType(
|
| - net::WebSocketFrameHeader::OpCode opCode) {
|
| - DCHECK(opCode == net::WebSocketFrameHeader::kOpCodeContinuation ||
|
| - opCode == net::WebSocketFrameHeader::kOpCodeText ||
|
| - opCode == net::WebSocketFrameHeader::kOpCodeBinary);
|
| +WebSocketMessageType OpCodeToMessageType(OpCode opCode) {
|
| + DCHECK(opCode == WebSocketFrameHeader::kOpCodeContinuation ||
|
| + opCode == WebSocketFrameHeader::kOpCodeText ||
|
| + opCode == WebSocketFrameHeader::kOpCodeBinary);
|
| // This cast is guaranteed valid by the static_assert() statements above.
|
| return static_cast<WebSocketMessageType>(opCode);
|
| }
|
|
|
| ChannelState StateCast(WebSocketDispatcherHost::WebSocketHostState host_state) {
|
| - const WebSocketDispatcherHost::WebSocketHostState WEBSOCKET_HOST_ALIVE =
|
| - WebSocketDispatcherHost::WEBSOCKET_HOST_ALIVE;
|
| - const WebSocketDispatcherHost::WebSocketHostState WEBSOCKET_HOST_DELETED =
|
| - WebSocketDispatcherHost::WEBSOCKET_HOST_DELETED;
|
| -
|
| DCHECK(host_state == WEBSOCKET_HOST_ALIVE ||
|
| host_state == WEBSOCKET_HOST_DELETED);
|
| // These compile asserts verify that we can get away with using static_cast<>
|
| // for the conversion.
|
| - static_assert(static_cast<ChannelState>(WEBSOCKET_HOST_ALIVE) ==
|
| - net::WebSocketEventInterface::CHANNEL_ALIVE,
|
| - "enum values must match for state_alive");
|
| - static_assert(static_cast<ChannelState>(WEBSOCKET_HOST_DELETED) ==
|
| - net::WebSocketEventInterface::CHANNEL_DELETED,
|
| - "enum values must match for state_deleted");
|
| + static_assert(
|
| + static_cast<ChannelState>(WEBSOCKET_HOST_ALIVE) == CHANNEL_ALIVE,
|
| + "enum values must match for state_alive");
|
| + static_assert(
|
| + static_cast<ChannelState>(WEBSOCKET_HOST_DELETED) == CHANNEL_DELETED,
|
| + "enum values must match for state_deleted");
|
| return static_cast<ChannelState>(host_state);
|
| }
|
|
|
| @@ -107,8 +108,8 @@ class SendChannelImpl final : public WebSocketBlobSender::Channel {
|
| }
|
|
|
| ChannelState SendFrame(bool fin, const std::vector<char>& data) override {
|
| - int opcode = first_frame_ ? net::WebSocketFrameHeader::kOpCodeBinary
|
| - : net::WebSocketFrameHeader::kOpCodeContinuation;
|
| + int opcode = first_frame_ ? WebSocketFrameHeader::kOpCodeBinary
|
| + : WebSocketFrameHeader::kOpCodeContinuation;
|
| first_frame_ = false;
|
| return channel_->SendFrame(fin, opcode, data);
|
| }
|
| @@ -152,7 +153,7 @@ class WebSocketHost::WebSocketEventHandler final
|
| ChannelState OnFinishOpeningHandshake(
|
| scoped_ptr<net::WebSocketHandshakeResponseInfo> response) override;
|
| ChannelState OnSSLCertificateError(
|
| - scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks,
|
| + scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> callbacks,
|
| const GURL& url,
|
| const net::SSLInfo& ssl_info,
|
| bool fatal) override;
|
| @@ -161,7 +162,7 @@ class WebSocketHost::WebSocketEventHandler final
|
| class SSLErrorHandlerDelegate final : public SSLErrorHandler::Delegate {
|
| public:
|
| SSLErrorHandlerDelegate(
|
| - scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks);
|
| + scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> callbacks);
|
| ~SSLErrorHandlerDelegate() override;
|
|
|
| base::WeakPtr<SSLErrorHandler::Delegate> GetWeakPtr();
|
| @@ -171,7 +172,7 @@ class WebSocketHost::WebSocketEventHandler final
|
| void ContinueSSLRequest() override;
|
|
|
| private:
|
| - scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks_;
|
| + scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> callbacks_;
|
| base::WeakPtrFactory<SSLErrorHandlerDelegate> weak_ptr_factory_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(SSLErrorHandlerDelegate);
|
| @@ -204,8 +205,8 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnAddChannelResponse(
|
| const std::string& selected_protocol,
|
| const std::string& extensions) {
|
| DVLOG(3) << "WebSocketEventHandler::OnAddChannelResponse"
|
| - << " routing_id=" << routing_id_
|
| - << " selected_protocol=\"" << selected_protocol << "\""
|
| + << " routing_id=" << routing_id_ << " selected_protocol=\""
|
| + << selected_protocol << "\""
|
| << " extensions=\"" << extensions << "\"";
|
|
|
| return StateCast(dispatcher_->SendAddChannelResponse(
|
| @@ -214,14 +215,27 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnAddChannelResponse(
|
|
|
| ChannelState WebSocketHost::WebSocketEventHandler::OnDataFrame(
|
| bool fin,
|
| - net::WebSocketFrameHeader::OpCode type,
|
| + OpCode type,
|
| const std::vector<char>& data) {
|
| DVLOG(3) << "WebSocketEventHandler::OnDataFrame"
|
| << " routing_id=" << routing_id_ << " fin=" << fin
|
| << " type=" << type << " data is " << data.size() << " bytes";
|
|
|
| - return StateCast(dispatcher_->SendFrame(routing_id_, fin,
|
| - OpCodeToMessageType(type), data));
|
| + host_->receive_quota_multiplexer_.ReceivedFrame(data.size());
|
| + if (host_->ShouldQueue(data.size())) {
|
| + host_->AppendToQueue(fin, type, data);
|
| + return CHANNEL_ALIVE;
|
| + }
|
| +
|
| + bool started_blob_receive = false;
|
| + if (host_->SendFrameInternal(fin, type, data, &started_blob_receive) ==
|
| + CHANNEL_DELETED) {
|
| + // |this| has been destroyed here.
|
| + return CHANNEL_DELETED;
|
| + }
|
| + if (started_blob_receive)
|
| + host_->AppendToQueue(fin, type, data);
|
| + return CHANNEL_ALIVE;
|
| }
|
|
|
| ChannelState WebSocketHost::WebSocketEventHandler::OnClosingHandshake() {
|
| @@ -249,6 +263,11 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnDropChannel(
|
| << " routing_id=" << routing_id_ << " was_clean=" << was_clean
|
| << " code=" << code << " reason=\"" << reason << "\"";
|
|
|
| + if (host_->ShouldDelayDropChannel()) {
|
| + DVLOG(3) << "WebSocketEventHandler::OnDropChannel delayed";
|
| + host_->SetPendingDropChannel(was_clean, code, reason);
|
| + return CHANNEL_ALIVE;
|
| + }
|
| return StateCast(
|
| dispatcher_->DoDropChannel(routing_id_, was_clean, code, reason));
|
| }
|
| @@ -268,7 +287,7 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnStartOpeningHandshake(
|
| << "should_send=" << should_send;
|
|
|
| if (!should_send)
|
| - return WebSocketEventInterface::CHANNEL_ALIVE;
|
| + return CHANNEL_ALIVE;
|
|
|
| WebSocketHandshakeRequest request_to_pass;
|
| request_to_pass.url.Swap(&request->url);
|
| @@ -292,7 +311,7 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnFinishOpeningHandshake(
|
| << "should_send=" << should_send;
|
|
|
| if (!should_send)
|
| - return WebSocketEventInterface::CHANNEL_ALIVE;
|
| + return CHANNEL_ALIVE;
|
|
|
| WebSocketHandshakeResponse response_to_pass;
|
| response_to_pass.url.Swap(&response->url);
|
| @@ -312,7 +331,7 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnFinishOpeningHandshake(
|
| }
|
|
|
| ChannelState WebSocketHost::WebSocketEventHandler::OnSSLCertificateError(
|
| - scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks,
|
| + scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> callbacks,
|
| const GURL& url,
|
| const net::SSLInfo& ssl_info,
|
| bool fatal) {
|
| @@ -325,12 +344,12 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnSSLCertificateError(
|
| ssl_error_handler_delegate_->GetWeakPtr(), url,
|
| dispatcher_->render_process_id(), render_frame_id_, ssl_info, fatal);
|
| // The above method is always asynchronous.
|
| - return WebSocketEventInterface::CHANNEL_ALIVE;
|
| + return CHANNEL_ALIVE;
|
| }
|
|
|
| WebSocketHost::WebSocketEventHandler::SSLErrorHandlerDelegate::
|
| SSLErrorHandlerDelegate(
|
| - scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks)
|
| + scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> callbacks)
|
| : callbacks_(std::move(callbacks)), weak_ptr_factory_(this) {}
|
|
|
| WebSocketHost::WebSocketEventHandler::SSLErrorHandlerDelegate::
|
| @@ -356,6 +375,123 @@ void WebSocketHost::WebSocketEventHandler::SSLErrorHandlerDelegate::
|
| callbacks_->ContinueSSLRequest();
|
| }
|
|
|
| +WebSocketHost::ReceiveQuotaMultiplexer::ReceiveQuotaMultiplexer() {}
|
| +
|
| +void WebSocketHost::ReceiveQuotaMultiplexer::SetChannel(
|
| + net::WebSocketChannel* channel) {
|
| + channel_ = channel;
|
| + if (!channel) {
|
| + channel_quota_ = 0;
|
| + }
|
| +}
|
| +
|
| +void WebSocketHost::ReceiveQuotaMultiplexer::SetConsumer(
|
| + ReceiveQuotaConsumer* consumer) {
|
| + DCHECK(consumer);
|
| + current_consumer_ = consumer;
|
| +}
|
| +
|
| +// Quota can be added to any consumer at any time.
|
| +bool WebSocketHost::ReceiveQuotaMultiplexer::AddQuota(
|
| + ReceiveQuotaProvider* provider,
|
| + size_t quota) {
|
| + *provider += quota;
|
| + return provider == current_consumer_;
|
| +}
|
| +
|
| +// Quota can only be used from the current consumer.
|
| +size_t WebSocketHost::ReceiveQuotaMultiplexer::AvailableQuota() const {
|
| + DCHECK(current_consumer_);
|
| + return *current_consumer_;
|
| +}
|
| +
|
| +// Quota is only consumed by the current consumer.
|
| +void WebSocketHost::ReceiveQuotaMultiplexer::ConsumedQuota(size_t quota) {
|
| + DCHECK_GE(*current_consumer_, quota);
|
| + *current_consumer_ -= quota;
|
| +}
|
| +
|
| +void WebSocketHost::ReceiveQuotaMultiplexer::ReceivedFrame(size_t size) {
|
| + DCHECK(channel_);
|
| + DCHECK_GE(channel_quota_, size);
|
| + channel_quota_ -= size;
|
| +}
|
| +
|
| +void WebSocketHost::ReceiveQuotaMultiplexer::PublishMoreQuotaIfAvailable() {
|
| + if (!channel_)
|
| + return;
|
| + DCHECK(current_consumer_);
|
| + if (*current_consumer_ > channel_quota_) {
|
| + size_t additional_quota =
|
| + *current_consumer_ - channel_quota_;
|
| + channel_quota_ = *current_consumer_;
|
| + // SendFlowControl may call back into OnDataFrame, so order matters here.
|
| + channel_->SendFlowControl(additional_quota);
|
| + }
|
| +}
|
| +
|
| +WebSocketHost::QueuedFrame::QueuedFrame(bool fin,
|
| + OpCode type,
|
| + const std::vector<char>& data)
|
| + : fin(fin), type(type), data(data) {}
|
| +
|
| +WebSocketHost::QueuedFrame::QueuedFrame(QueuedFrame&& rhs)
|
| + : fin(rhs.fin), type(rhs.type), data(std::move(rhs.data)) {}
|
| +
|
| +WebSocketHost::QueuedFrame::~QueuedFrame() {}
|
| +
|
| +WebSocketHost::QueuedFrame& WebSocketHost::QueuedFrame::operator=(
|
| + QueuedFrame&& rhs) {
|
| + if (this == &rhs)
|
| + return *this;
|
| + fin = rhs.fin;
|
| + type = rhs.type;
|
| + data = std::move(rhs.data);
|
| + return *this;
|
| +}
|
| +
|
| +struct WebSocketHost::DropChannelParameters {
|
| + DropChannelParameters(bool was_clean,
|
| + uint16_t code,
|
| + const std::string& reason)
|
| + : was_clean(was_clean), code(code), reason(reason) {}
|
| +
|
| + bool was_clean;
|
| + uint16_t code;
|
| + std::string reason;
|
| +};
|
| +
|
| +class WebSocketHost::BlobReceiverClient final
|
| + : public WebSocketBlobReceiver::Client {
|
| + public:
|
| + explicit BlobReceiverClient(WebSocketHost* host) : host_(host) {}
|
| +
|
| + // Implementation of WebSocketBlobReceiver::Client
|
| + void BlobCreated(scoped_ptr<storage::BlobDataHandle> blob_data_handle,
|
| + uint64_t size) override {
|
| + host_->FinishReceivingBlob(std::move(blob_data_handle), size);
|
| + // |this| may be destroyed here.
|
| + }
|
| +
|
| + void BlobFailed(int net_error_code) override {
|
| + host_->BlobReceiveFailed(net_error_code);
|
| + // |this| is destroyed here.
|
| + }
|
| +
|
| + void AddFlowControlQuota(size_t quota) override {
|
| + if (host_->receive_quota_multiplexer_.AddQuota(&host_->blob_receiver_quota_,
|
| + quota)) {
|
| + host_->FlushQueueAndPublishQuotaIfAvailable();
|
| + // |this| may be destroyed here.
|
| + }
|
| + }
|
| +
|
| + private:
|
| + WebSocketHost* host_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(BlobReceiverClient);
|
| +};
|
| +
|
| WebSocketHost::WebSocketHost(int routing_id,
|
| WebSocketDispatcherHost* dispatcher,
|
| net::URLRequestContext* url_request_context,
|
| @@ -366,6 +502,9 @@ WebSocketHost::WebSocketHost(int routing_id,
|
| delay_(delay),
|
| pending_flow_control_quota_(0),
|
| handshake_succeeded_(false),
|
| + binary_type_(WebSocketBinaryType::BLOB),
|
| + renderer_quota_(0),
|
| + blob_receiver_quota_(0),
|
| weak_ptr_factory_(this) {
|
| DVLOG(1) << "WebSocketHost: created routing_id=" << routing_id;
|
| }
|
| @@ -382,6 +521,8 @@ bool WebSocketHost::OnMessageReceived(const IPC::Message& message) {
|
| IPC_BEGIN_MESSAGE_MAP(WebSocketHost, message)
|
| IPC_MESSAGE_HANDLER(WebSocketHostMsg_AddChannelRequest, OnAddChannelRequest)
|
| IPC_MESSAGE_HANDLER(WebSocketHostMsg_SendBlob, OnSendBlob)
|
| + IPC_MESSAGE_HANDLER(WebSocketHostMsg_BinaryTypeChanged, OnBinaryTypeChanged)
|
| + IPC_MESSAGE_HANDLER(WebSocketHostMsg_BlobConfirmed, OnBlobConfirmed)
|
| IPC_MESSAGE_HANDLER(WebSocketMsg_SendFrame, OnSendFrame)
|
| IPC_MESSAGE_HANDLER(WebSocketMsg_FlowControl, OnFlowControl)
|
| IPC_MESSAGE_HANDLER(WebSocketMsg_DropChannel, OnDropChannel)
|
| @@ -445,6 +586,8 @@ void WebSocketHost::AddChannel(
|
| pending_flow_control_quota_));
|
| pending_flow_control_quota_ = 0;
|
| }
|
| + receive_quota_multiplexer_.SetChannel(channel_.get());
|
| + receive_quota_multiplexer_.SetConsumer(&renderer_quota_);
|
|
|
| channel_->SendAddChannelRequest(socket_url, requested_protocols, origin);
|
| // |this| may have been deleted here.
|
| @@ -468,8 +611,7 @@ void WebSocketHost::OnSendBlob(const std::string& uuid,
|
| storage::FileSystemContext* file_system_context =
|
| partition->GetFileSystemContext();
|
|
|
| - net::WebSocketEventInterface::ChannelState channel_state =
|
| - net::WebSocketEventInterface::CHANNEL_ALIVE;
|
| + ChannelState channel_state = CHANNEL_ALIVE;
|
|
|
| // This use of base::Unretained is safe because the WebSocketBlobSender object
|
| // is owned by this object and will not call it back after destruction.
|
| @@ -479,8 +621,7 @@ void WebSocketHost::OnSendBlob(const std::string& uuid,
|
| BrowserThread::GetMessageLoopProxyForThread(BrowserThread::FILE).get(),
|
| &channel_state,
|
| base::Bind(&WebSocketHost::BlobSendComplete, base::Unretained(this)));
|
| - if (channel_state == net::WebSocketEventInterface::CHANNEL_ALIVE &&
|
| - rv != net::ERR_IO_PENDING)
|
| + if (channel_state == CHANNEL_ALIVE && rv != net::ERR_IO_PENDING)
|
| BlobSendComplete(rv);
|
| // |this| may be destroyed here.
|
| }
|
| @@ -492,6 +633,9 @@ void WebSocketHost::OnSendFrame(bool fin,
|
| << " routing_id=" << routing_id_ << " fin=" << fin
|
| << " type=" << type << " data is " << data.size() << " bytes";
|
|
|
| + if (pending_drop_channel_)
|
| + return;
|
| +
|
| DCHECK(channel_);
|
| if (blob_sender_) {
|
| bad_message::ReceivedBadMessage(
|
| @@ -504,6 +648,11 @@ void WebSocketHost::OnSendFrame(bool fin,
|
| void WebSocketHost::OnFlowControl(int64_t quota) {
|
| DVLOG(3) << "WebSocketHost::OnFlowControl"
|
| << " routing_id=" << routing_id_ << " quota=" << quota;
|
| + if (quota < 0) {
|
| + bad_message::ReceivedBadMessage(dispatcher_,
|
| + bad_message::WSH_NEGATIVE_QUOTA);
|
| + return;
|
| + }
|
|
|
| if (!channel_) {
|
| // WebSocketChannel is not yet created due to the delay introduced by
|
| @@ -513,7 +662,10 @@ void WebSocketHost::OnFlowControl(int64_t quota) {
|
| return;
|
| }
|
|
|
| - channel_->SendFlowControl(quota);
|
| + if (receive_quota_multiplexer_.AddQuota(&renderer_quota_, quota)) {
|
| + FlushQueueAndPublishQuotaIfAvailable();
|
| + // |this| may be destroyed here.
|
| + }
|
| }
|
|
|
| void WebSocketHost::OnDropChannel(bool was_clean,
|
| @@ -529,15 +681,42 @@ void WebSocketHost::OnDropChannel(bool was_clean,
|
| WebSocketDispatcherHost::WebSocketHostState result =
|
| dispatcher_->DoDropChannel(routing_id_, false,
|
| net::kWebSocketErrorAbnormalClosure, "");
|
| - DCHECK_EQ(WebSocketDispatcherHost::WEBSOCKET_HOST_DELETED, result);
|
| + DCHECK_EQ(WEBSOCKET_HOST_DELETED, result);
|
| return;
|
| }
|
|
|
| + if (pending_drop_channel_)
|
| + return;
|
| +
|
| blob_sender_.reset();
|
| // TODO(yhirano): Handle |was_clean| appropriately.
|
| channel_->StartClosingHandshake(code, reason);
|
| }
|
|
|
| +void WebSocketHost::OnBinaryTypeChanged(WebSocketBinaryType new_type) {
|
| + DVLOG(3) << "WebSocketHost::OnSetBinaryType"
|
| + << " routing_id= " << routing_id_
|
| + << " new_type=" << static_cast<int>(new_type);
|
| +
|
| + binary_type_ = new_type;
|
| +}
|
| +
|
| +void WebSocketHost::OnBlobConfirmed() {
|
| + DVLOG(3) << "WebSocketHost::OnBlobConfirmed"
|
| + << " routing_id= " << routing_id_;
|
| +
|
| + if (unconfirmed_blob_queue_.empty()) {
|
| + ReceivedBadMessage(dispatcher_,
|
| + bad_message::WSH_UNSOLICITED_BLOB_CONFIRMATION);
|
| + return;
|
| + }
|
| + unconfirmed_blob_queue_.pop();
|
| + if (pending_drop_channel_ && !ShouldDelayDropChannel()) {
|
| + DoDelayedDropChannel();
|
| + // |this| is destroyed here.
|
| + }
|
| +}
|
| +
|
| void WebSocketHost::BlobSendComplete(int result) {
|
| DVLOG(3) << "WebSocketHost::BlobSendComplete"
|
| << " routing_id=" << routing_id_
|
| @@ -575,4 +754,147 @@ void WebSocketHost::BlobSendComplete(int result) {
|
| }
|
| }
|
|
|
| +bool WebSocketHost::ShouldQueue(size_t data_size) {
|
| + return (!data_frame_queue_.empty() ||
|
| + receive_quota_multiplexer_.AvailableQuota() < data_size ||
|
| + (blob_receiver_ && blob_receiver_->finish_called()));
|
| +}
|
| +
|
| +void WebSocketHost::AppendToQueue(bool fin,
|
| + OpCode type,
|
| + const std::vector<char>& data) {
|
| + data_frame_queue_.emplace(fin, type, data);
|
| +}
|
| +
|
| +void WebSocketHost::FlushQueueAndPublishQuotaIfAvailable() {
|
| + while (!data_frame_queue_.empty() &&
|
| + receive_quota_multiplexer_.AvailableQuota() >=
|
| + data_frame_queue_.front().data.size()) {
|
| + const QueuedFrame& front = data_frame_queue_.front();
|
| + if (blob_receiver_ && blob_receiver_->finish_called())
|
| + return; // without consuming the message.
|
| + bool started_blob_receive = false;
|
| + if (SendFrameInternal(front.fin, front.type, front.data,
|
| + &started_blob_receive) == CHANNEL_DELETED) {
|
| + // |this| has been destroyed here.
|
| + return;
|
| + }
|
| + if (started_blob_receive)
|
| + return; // without consuming the message.
|
| + data_frame_queue_.pop();
|
| + }
|
| + if (data_frame_queue_.empty()) {
|
| + if (pending_drop_channel_ && !ShouldDelayDropChannel()) {
|
| + DoDelayedDropChannel();
|
| + // |this| is destroyed here. No need to publish more quota.
|
| + return;
|
| + }
|
| + receive_quota_multiplexer_.PublishMoreQuotaIfAvailable();
|
| + // |this| may be destroyed here.
|
| + }
|
| +}
|
| +
|
| +ChannelState WebSocketHost::SendFrameInternal(bool fin,
|
| + OpCode type,
|
| + const std::vector<char>& data,
|
| + bool* started_blob_receive) {
|
| + receive_quota_multiplexer_.ConsumedQuota(data.size());
|
| + if (blob_receiver_) {
|
| + DCHECK_NE(type, WebSocketFrameHeader::kOpCodeText);
|
| + int rv = blob_receiver_->AppendData(data);
|
| + if (rv != net::ERR_IO_PENDING && rv < 0) {
|
| + BlobReceiveFailed(rv);
|
| + // |this| is destroyed.
|
| + return CHANNEL_DELETED;
|
| + }
|
| + if (fin) {
|
| + int rv = blob_receiver_->Finish();
|
| + if (rv != net::ERR_IO_PENDING && rv < 0) {
|
| + BlobReceiveFailed(rv);
|
| + // |this| is destroyed.
|
| + return CHANNEL_DELETED;
|
| + }
|
| + }
|
| + return CHANNEL_ALIVE;
|
| + }
|
| +
|
| + if (type == WebSocketFrameHeader::kOpCodeBinary &&
|
| + binary_type_ == WebSocketBinaryType::BLOB) {
|
| + StartReceivingBlob(fin, data);
|
| + *started_blob_receive = true;
|
| + return CHANNEL_ALIVE;
|
| + }
|
| +
|
| + return StateCast(dispatcher_->SendFrame(routing_id_, fin,
|
| + OpCodeToMessageType(type), data));
|
| +}
|
| +
|
| +void WebSocketHost::StartReceivingBlob(bool fin,
|
| + const std::vector<char>& data) {
|
| + DCHECK(!blob_receiver_);
|
| +
|
| + blob_receiver_.reset(
|
| + new WebSocketBlobReceiver(make_scoped_ptr(new BlobReceiverClient(this)),
|
| + dispatcher_->blob_storage_context()));
|
| + receive_quota_multiplexer_.SetConsumer(&blob_receiver_quota_);
|
| + DCHECK_EQ(receive_quota_multiplexer_.AvailableQuota(), 0u);
|
| + blob_receiver_->Start();
|
| +}
|
| +
|
| +void WebSocketHost::FinishReceivingBlob(
|
| + scoped_ptr<storage::BlobDataHandle> blob_data_handle,
|
| + uint64_t size) {
|
| + const std::string& uuid = blob_data_handle->uuid();
|
| + unconfirmed_blob_queue_.push(std::move(blob_data_handle));
|
| + if (dispatcher_->BlobReceived(routing_id_, uuid, size) ==
|
| + WEBSOCKET_HOST_DELETED) {
|
| + // |this| has been destroyed.
|
| + return;
|
| + }
|
| + blob_receiver_.reset();
|
| + receive_quota_multiplexer_.SetConsumer(&renderer_quota_);
|
| + blob_receiver_quota_ = 0;
|
| + FlushQueueAndPublishQuotaIfAvailable();
|
| + // |this| may be destroyed here.
|
| +}
|
| +
|
| +void WebSocketHost::BlobReceiveFailed(int net_error_code) {
|
| + ignore_result(dispatcher_->NotifyFailure(
|
| + routing_id_,
|
| + "Blob receive failed: " + net::ErrorToString(net_error_code)));
|
| + // |this| is destroyed here.
|
| +}
|
| +
|
| +bool WebSocketHost::ShouldDelayDropChannel() const {
|
| + return blob_receiver_ || !data_frame_queue_.empty() ||
|
| + !unconfirmed_blob_queue_.empty();
|
| +}
|
| +
|
| +void WebSocketHost::SetPendingDropChannel(bool was_clean,
|
| + uint16_t code,
|
| + const std::string& reason) {
|
| + DCHECK(!pending_drop_channel_);
|
| + pending_drop_channel_.reset(
|
| + new DropChannelParameters(was_clean, code, reason));
|
| + // Should not send any more messages on the channel.
|
| + blob_sender_.reset();
|
| + // Should not supply any more quota to the channel.
|
| + receive_quota_multiplexer_.SetChannel(nullptr);
|
| +}
|
| +
|
| +void WebSocketHost::DoDelayedDropChannel() {
|
| + DCHECK(!ShouldDelayDropChannel());
|
| + scoped_ptr<DropChannelParameters> parameters =
|
| + std::move(pending_drop_channel_);
|
| + DVLOG(3)
|
| + << "WebSocketHost::DoDelayedDropChannel() performing delayed DropChannel"
|
| + << " routing_id=" << routing_id_ << " was_clean=" << parameters->was_clean
|
| + << " code=" << parameters->code << " reason=\"" << parameters->reason
|
| + << "\"";
|
| + ignore_result(dispatcher_->DoDropChannel(routing_id_, parameters->was_clean,
|
| + parameters->code,
|
| + parameters->reason));
|
| + // |this| is destroyed here.
|
| +}
|
| +
|
| } // namespace content
|
|
|