Index: content/browser/renderer_host/websocket_host.cc |
diff --git a/content/browser/renderer_host/websocket_host.cc b/content/browser/renderer_host/websocket_host.cc |
index d0715889db733e6a55e4187fb5752372b909f85d..4a291a0b51e3103d425e6e79ce202def4dd42428 100644 |
--- a/content/browser/renderer_host/websocket_host.cc |
+++ b/content/browser/renderer_host/websocket_host.cc |
@@ -17,6 +17,7 @@ |
#include "base/strings/stringprintf.h" |
#include "base/thread_task_runner_handle.h" |
#include "content/browser/bad_message.h" |
+#include "content/browser/renderer_host/websocket_blob_receiver.h" |
#include "content/browser/renderer_host/websocket_blob_sender.h" |
#include "content/browser/renderer_host/websocket_dispatcher_host.h" |
#include "content/browser/ssl/ssl_error_handler.h" |
@@ -33,7 +34,6 @@ |
#include "net/ssl/ssl_info.h" |
#include "net/websockets/websocket_channel.h" |
#include "net/websockets/websocket_errors.h" |
-#include "net/websockets/websocket_event_interface.h" |
#include "net/websockets/websocket_frame.h" // for WebSocketFrameHeader::OpCode |
#include "net/websockets/websocket_handshake_request_info.h" |
#include "net/websockets/websocket_handshake_response_info.h" |
@@ -43,55 +43,56 @@ namespace content { |
namespace { |
-typedef net::WebSocketEventInterface::ChannelState ChannelState; |
+using net::WebSocketEventInterface; |
+using net::WebSocketFrameHeader; |
+using ChannelState = WebSocketEventInterface::ChannelState; |
+using OpCode = WebSocketFrameHeader::OpCode; |
+ |
+const auto CHANNEL_ALIVE = WebSocketEventInterface::CHANNEL_ALIVE; |
+const auto CHANNEL_DELETED = WebSocketEventInterface::CHANNEL_DELETED; |
+const auto WEBSOCKET_HOST_ALIVE = WebSocketDispatcherHost::WEBSOCKET_HOST_ALIVE; |
+const auto WEBSOCKET_HOST_DELETED = |
+ WebSocketDispatcherHost::WEBSOCKET_HOST_DELETED; |
// Convert a content::WebSocketMessageType to a |
-// net::WebSocketFrameHeader::OpCode |
-net::WebSocketFrameHeader::OpCode MessageTypeToOpCode( |
- WebSocketMessageType type) { |
+// WebSocketFrameHeader::OpCode |
+OpCode MessageTypeToOpCode(WebSocketMessageType type) { |
DCHECK(type == WEB_SOCKET_MESSAGE_TYPE_CONTINUATION || |
type == WEB_SOCKET_MESSAGE_TYPE_TEXT || |
type == WEB_SOCKET_MESSAGE_TYPE_BINARY); |
- typedef net::WebSocketFrameHeader::OpCode OpCode; |
// These compile asserts verify that the same underlying values are used for |
// both types, so we can simply cast between them. |
static_assert(static_cast<OpCode>(WEB_SOCKET_MESSAGE_TYPE_CONTINUATION) == |
- net::WebSocketFrameHeader::kOpCodeContinuation, |
+ WebSocketFrameHeader::kOpCodeContinuation, |
"enum values must match for opcode continuation"); |
static_assert(static_cast<OpCode>(WEB_SOCKET_MESSAGE_TYPE_TEXT) == |
- net::WebSocketFrameHeader::kOpCodeText, |
+ WebSocketFrameHeader::kOpCodeText, |
"enum values must match for opcode text"); |
static_assert(static_cast<OpCode>(WEB_SOCKET_MESSAGE_TYPE_BINARY) == |
- net::WebSocketFrameHeader::kOpCodeBinary, |
+ WebSocketFrameHeader::kOpCodeBinary, |
"enum values must match for opcode binary"); |
return static_cast<OpCode>(type); |
} |
-WebSocketMessageType OpCodeToMessageType( |
- net::WebSocketFrameHeader::OpCode opCode) { |
- DCHECK(opCode == net::WebSocketFrameHeader::kOpCodeContinuation || |
- opCode == net::WebSocketFrameHeader::kOpCodeText || |
- opCode == net::WebSocketFrameHeader::kOpCodeBinary); |
+WebSocketMessageType OpCodeToMessageType(OpCode opCode) { |
+ DCHECK(opCode == WebSocketFrameHeader::kOpCodeContinuation || |
+ opCode == WebSocketFrameHeader::kOpCodeText || |
+ opCode == WebSocketFrameHeader::kOpCodeBinary); |
// This cast is guaranteed valid by the static_assert() statements above. |
return static_cast<WebSocketMessageType>(opCode); |
} |
ChannelState StateCast(WebSocketDispatcherHost::WebSocketHostState host_state) { |
- const WebSocketDispatcherHost::WebSocketHostState WEBSOCKET_HOST_ALIVE = |
- WebSocketDispatcherHost::WEBSOCKET_HOST_ALIVE; |
- const WebSocketDispatcherHost::WebSocketHostState WEBSOCKET_HOST_DELETED = |
- WebSocketDispatcherHost::WEBSOCKET_HOST_DELETED; |
- |
DCHECK(host_state == WEBSOCKET_HOST_ALIVE || |
host_state == WEBSOCKET_HOST_DELETED); |
// These compile asserts verify that we can get away with using static_cast<> |
// for the conversion. |
- static_assert(static_cast<ChannelState>(WEBSOCKET_HOST_ALIVE) == |
- net::WebSocketEventInterface::CHANNEL_ALIVE, |
- "enum values must match for state_alive"); |
- static_assert(static_cast<ChannelState>(WEBSOCKET_HOST_DELETED) == |
- net::WebSocketEventInterface::CHANNEL_DELETED, |
- "enum values must match for state_deleted"); |
+ static_assert( |
+ static_cast<ChannelState>(WEBSOCKET_HOST_ALIVE) == CHANNEL_ALIVE, |
+ "enum values must match for state_alive"); |
+ static_assert( |
+ static_cast<ChannelState>(WEBSOCKET_HOST_DELETED) == CHANNEL_DELETED, |
+ "enum values must match for state_deleted"); |
return static_cast<ChannelState>(host_state); |
} |
@@ -107,8 +108,8 @@ class SendChannelImpl final : public WebSocketBlobSender::Channel { |
} |
ChannelState SendFrame(bool fin, const std::vector<char>& data) override { |
- int opcode = first_frame_ ? net::WebSocketFrameHeader::kOpCodeBinary |
- : net::WebSocketFrameHeader::kOpCodeContinuation; |
+ int opcode = first_frame_ ? WebSocketFrameHeader::kOpCodeBinary |
+ : WebSocketFrameHeader::kOpCodeContinuation; |
first_frame_ = false; |
return channel_->SendFrame(fin, opcode, data); |
} |
@@ -152,7 +153,7 @@ class WebSocketHost::WebSocketEventHandler final |
ChannelState OnFinishOpeningHandshake( |
scoped_ptr<net::WebSocketHandshakeResponseInfo> response) override; |
ChannelState OnSSLCertificateError( |
- scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks, |
+ scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> callbacks, |
const GURL& url, |
const net::SSLInfo& ssl_info, |
bool fatal) override; |
@@ -161,7 +162,7 @@ class WebSocketHost::WebSocketEventHandler final |
class SSLErrorHandlerDelegate final : public SSLErrorHandler::Delegate { |
public: |
SSLErrorHandlerDelegate( |
- scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks); |
+ scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> callbacks); |
~SSLErrorHandlerDelegate() override; |
base::WeakPtr<SSLErrorHandler::Delegate> GetWeakPtr(); |
@@ -171,7 +172,7 @@ class WebSocketHost::WebSocketEventHandler final |
void ContinueSSLRequest() override; |
private: |
- scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks_; |
+ scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> callbacks_; |
base::WeakPtrFactory<SSLErrorHandlerDelegate> weak_ptr_factory_; |
DISALLOW_COPY_AND_ASSIGN(SSLErrorHandlerDelegate); |
@@ -204,8 +205,8 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnAddChannelResponse( |
const std::string& selected_protocol, |
const std::string& extensions) { |
DVLOG(3) << "WebSocketEventHandler::OnAddChannelResponse" |
- << " routing_id=" << routing_id_ |
- << " selected_protocol=\"" << selected_protocol << "\"" |
+ << " routing_id=" << routing_id_ << " selected_protocol=\"" |
+ << selected_protocol << "\"" |
<< " extensions=\"" << extensions << "\""; |
return StateCast(dispatcher_->SendAddChannelResponse( |
@@ -214,14 +215,27 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnAddChannelResponse( |
ChannelState WebSocketHost::WebSocketEventHandler::OnDataFrame( |
bool fin, |
- net::WebSocketFrameHeader::OpCode type, |
+ OpCode type, |
const std::vector<char>& data) { |
DVLOG(3) << "WebSocketEventHandler::OnDataFrame" |
<< " routing_id=" << routing_id_ << " fin=" << fin |
<< " type=" << type << " data is " << data.size() << " bytes"; |
- return StateCast(dispatcher_->SendFrame(routing_id_, fin, |
- OpCodeToMessageType(type), data)); |
+ host_->receive_quota_multiplexer_.ReceivedFrame(data.size()); |
+ if (host_->ShouldQueue(data.size())) { |
+ host_->AppendToQueue(fin, type, data); |
+ return CHANNEL_ALIVE; |
+ } |
+ |
+ bool started_blob_receive = false; |
+ if (host_->SendFrameInternal(fin, type, data, &started_blob_receive) == |
+ CHANNEL_DELETED) { |
+ // |this| has been destroyed here. |
+ return CHANNEL_DELETED; |
+ } |
+ if (started_blob_receive) |
+ host_->AppendToQueue(fin, type, data); |
+ return CHANNEL_ALIVE; |
} |
ChannelState WebSocketHost::WebSocketEventHandler::OnClosingHandshake() { |
@@ -249,6 +263,11 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnDropChannel( |
<< " routing_id=" << routing_id_ << " was_clean=" << was_clean |
<< " code=" << code << " reason=\"" << reason << "\""; |
+ if (host_->ShouldDelayDropChannel()) { |
+ DVLOG(3) << "WebSocketEventHandler::OnDropChannel delayed"; |
+ host_->SetPendingDropChannel(was_clean, code, reason); |
+ return CHANNEL_ALIVE; |
+ } |
return StateCast( |
dispatcher_->DoDropChannel(routing_id_, was_clean, code, reason)); |
} |
@@ -268,7 +287,7 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnStartOpeningHandshake( |
<< "should_send=" << should_send; |
if (!should_send) |
- return WebSocketEventInterface::CHANNEL_ALIVE; |
+ return CHANNEL_ALIVE; |
WebSocketHandshakeRequest request_to_pass; |
request_to_pass.url.Swap(&request->url); |
@@ -292,7 +311,7 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnFinishOpeningHandshake( |
<< "should_send=" << should_send; |
if (!should_send) |
- return WebSocketEventInterface::CHANNEL_ALIVE; |
+ return CHANNEL_ALIVE; |
WebSocketHandshakeResponse response_to_pass; |
response_to_pass.url.Swap(&response->url); |
@@ -312,7 +331,7 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnFinishOpeningHandshake( |
} |
ChannelState WebSocketHost::WebSocketEventHandler::OnSSLCertificateError( |
- scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks, |
+ scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> callbacks, |
const GURL& url, |
const net::SSLInfo& ssl_info, |
bool fatal) { |
@@ -325,12 +344,12 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnSSLCertificateError( |
ssl_error_handler_delegate_->GetWeakPtr(), url, |
dispatcher_->render_process_id(), render_frame_id_, ssl_info, fatal); |
// The above method is always asynchronous. |
- return WebSocketEventInterface::CHANNEL_ALIVE; |
+ return CHANNEL_ALIVE; |
} |
WebSocketHost::WebSocketEventHandler::SSLErrorHandlerDelegate:: |
SSLErrorHandlerDelegate( |
- scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks) |
+ scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> callbacks) |
: callbacks_(std::move(callbacks)), weak_ptr_factory_(this) {} |
WebSocketHost::WebSocketEventHandler::SSLErrorHandlerDelegate:: |
@@ -356,6 +375,123 @@ void WebSocketHost::WebSocketEventHandler::SSLErrorHandlerDelegate:: |
callbacks_->ContinueSSLRequest(); |
} |
+WebSocketHost::ReceiveQuotaMultiplexer::ReceiveQuotaMultiplexer() {} |
+ |
+void WebSocketHost::ReceiveQuotaMultiplexer::SetChannel( |
+ net::WebSocketChannel* channel) { |
+ channel_ = channel; |
+ if (!channel) { |
+ channel_quota_ = 0; |
+ } |
+} |
+ |
+void WebSocketHost::ReceiveQuotaMultiplexer::SetConsumer( |
+ ReceiveQuotaConsumer* consumer) { |
+ DCHECK(consumer); |
+ current_consumer_ = consumer; |
+} |
+ |
+// Quota can be added to any consumer at any time. |
+bool WebSocketHost::ReceiveQuotaMultiplexer::AddQuota( |
+ ReceiveQuotaProvider* provider, |
+ size_t quota) { |
+ *provider += quota; |
+ return provider == current_consumer_; |
+} |
+ |
+// Quota can only be used from the current consumer. |
+size_t WebSocketHost::ReceiveQuotaMultiplexer::AvailableQuota() const { |
+ DCHECK(current_consumer_); |
+ return *current_consumer_; |
+} |
+ |
+// Quota is only consumed by the current consumer. |
+void WebSocketHost::ReceiveQuotaMultiplexer::ConsumedQuota(size_t quota) { |
+ DCHECK_GE(*current_consumer_, quota); |
+ *current_consumer_ -= quota; |
+} |
+ |
+void WebSocketHost::ReceiveQuotaMultiplexer::ReceivedFrame(size_t size) { |
+ DCHECK(channel_); |
+ DCHECK_GE(channel_quota_, size); |
+ channel_quota_ -= size; |
+} |
+ |
+void WebSocketHost::ReceiveQuotaMultiplexer::PublishMoreQuotaIfAvailable() { |
+ if (!channel_) |
+ return; |
+ DCHECK(current_consumer_); |
+ if (*current_consumer_ > channel_quota_) { |
+ size_t additional_quota = |
+ *current_consumer_ - channel_quota_; |
+ channel_quota_ = *current_consumer_; |
+ // SendFlowControl may call back into OnDataFrame, so order matters here. |
+ channel_->SendFlowControl(additional_quota); |
+ } |
+} |
+ |
+WebSocketHost::QueuedFrame::QueuedFrame(bool fin, |
+ OpCode type, |
+ const std::vector<char>& data) |
+ : fin(fin), type(type), data(data) {} |
+ |
+WebSocketHost::QueuedFrame::QueuedFrame(QueuedFrame&& rhs) |
+ : fin(rhs.fin), type(rhs.type), data(std::move(rhs.data)) {} |
+ |
+WebSocketHost::QueuedFrame::~QueuedFrame() {} |
+ |
+WebSocketHost::QueuedFrame& WebSocketHost::QueuedFrame::operator=( |
+ QueuedFrame&& rhs) { |
+ if (this == &rhs) |
+ return *this; |
+ fin = rhs.fin; |
+ type = rhs.type; |
+ data = std::move(rhs.data); |
+ return *this; |
+} |
+ |
+struct WebSocketHost::DropChannelParameters { |
+ DropChannelParameters(bool was_clean, |
+ uint16_t code, |
+ const std::string& reason) |
+ : was_clean(was_clean), code(code), reason(reason) {} |
+ |
+ bool was_clean; |
+ uint16_t code; |
+ std::string reason; |
+}; |
+ |
+class WebSocketHost::BlobReceiverClient final |
+ : public WebSocketBlobReceiver::Client { |
+ public: |
+ explicit BlobReceiverClient(WebSocketHost* host) : host_(host) {} |
+ |
+ // Implementation of WebSocketBlobReceiver::Client |
+ void BlobCreated(scoped_ptr<storage::BlobDataHandle> blob_data_handle, |
+ uint64_t size) override { |
+ host_->FinishReceivingBlob(std::move(blob_data_handle), size); |
+ // |this| may be destroyed here. |
+ } |
+ |
+ void BlobFailed(int net_error_code) override { |
+ host_->BlobReceiveFailed(net_error_code); |
+ // |this| is destroyed here. |
+ } |
+ |
+ void AddFlowControlQuota(size_t quota) override { |
+ if (host_->receive_quota_multiplexer_.AddQuota(&host_->blob_receiver_quota_, |
+ quota)) { |
+ host_->FlushQueueAndPublishQuotaIfAvailable(); |
+ // |this| may be destroyed here. |
+ } |
+ } |
+ |
+ private: |
+ WebSocketHost* host_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(BlobReceiverClient); |
+}; |
+ |
WebSocketHost::WebSocketHost(int routing_id, |
WebSocketDispatcherHost* dispatcher, |
net::URLRequestContext* url_request_context, |
@@ -366,6 +502,9 @@ WebSocketHost::WebSocketHost(int routing_id, |
delay_(delay), |
pending_flow_control_quota_(0), |
handshake_succeeded_(false), |
+ binary_type_(WebSocketBinaryType::BLOB), |
+ renderer_quota_(0), |
+ blob_receiver_quota_(0), |
weak_ptr_factory_(this) { |
DVLOG(1) << "WebSocketHost: created routing_id=" << routing_id; |
} |
@@ -382,6 +521,8 @@ bool WebSocketHost::OnMessageReceived(const IPC::Message& message) { |
IPC_BEGIN_MESSAGE_MAP(WebSocketHost, message) |
IPC_MESSAGE_HANDLER(WebSocketHostMsg_AddChannelRequest, OnAddChannelRequest) |
IPC_MESSAGE_HANDLER(WebSocketHostMsg_SendBlob, OnSendBlob) |
+ IPC_MESSAGE_HANDLER(WebSocketHostMsg_BinaryTypeChanged, OnBinaryTypeChanged) |
+ IPC_MESSAGE_HANDLER(WebSocketHostMsg_BlobConfirmed, OnBlobConfirmed) |
IPC_MESSAGE_HANDLER(WebSocketMsg_SendFrame, OnSendFrame) |
IPC_MESSAGE_HANDLER(WebSocketMsg_FlowControl, OnFlowControl) |
IPC_MESSAGE_HANDLER(WebSocketMsg_DropChannel, OnDropChannel) |
@@ -445,6 +586,8 @@ void WebSocketHost::AddChannel( |
pending_flow_control_quota_)); |
pending_flow_control_quota_ = 0; |
} |
+ receive_quota_multiplexer_.SetChannel(channel_.get()); |
+ receive_quota_multiplexer_.SetConsumer(&renderer_quota_); |
channel_->SendAddChannelRequest(socket_url, requested_protocols, origin); |
// |this| may have been deleted here. |
@@ -468,8 +611,7 @@ void WebSocketHost::OnSendBlob(const std::string& uuid, |
storage::FileSystemContext* file_system_context = |
partition->GetFileSystemContext(); |
- net::WebSocketEventInterface::ChannelState channel_state = |
- net::WebSocketEventInterface::CHANNEL_ALIVE; |
+ ChannelState channel_state = CHANNEL_ALIVE; |
// This use of base::Unretained is safe because the WebSocketBlobSender object |
// is owned by this object and will not call it back after destruction. |
@@ -479,8 +621,7 @@ void WebSocketHost::OnSendBlob(const std::string& uuid, |
BrowserThread::GetMessageLoopProxyForThread(BrowserThread::FILE).get(), |
&channel_state, |
base::Bind(&WebSocketHost::BlobSendComplete, base::Unretained(this))); |
- if (channel_state == net::WebSocketEventInterface::CHANNEL_ALIVE && |
- rv != net::ERR_IO_PENDING) |
+ if (channel_state == CHANNEL_ALIVE && rv != net::ERR_IO_PENDING) |
BlobSendComplete(rv); |
// |this| may be destroyed here. |
} |
@@ -492,6 +633,9 @@ void WebSocketHost::OnSendFrame(bool fin, |
<< " routing_id=" << routing_id_ << " fin=" << fin |
<< " type=" << type << " data is " << data.size() << " bytes"; |
+ if (pending_drop_channel_) |
+ return; |
+ |
DCHECK(channel_); |
if (blob_sender_) { |
bad_message::ReceivedBadMessage( |
@@ -504,6 +648,11 @@ void WebSocketHost::OnSendFrame(bool fin, |
void WebSocketHost::OnFlowControl(int64_t quota) { |
DVLOG(3) << "WebSocketHost::OnFlowControl" |
<< " routing_id=" << routing_id_ << " quota=" << quota; |
+ if (quota < 0) { |
+ bad_message::ReceivedBadMessage(dispatcher_, |
+ bad_message::WSH_NEGATIVE_QUOTA); |
+ return; |
+ } |
if (!channel_) { |
// WebSocketChannel is not yet created due to the delay introduced by |
@@ -513,7 +662,10 @@ void WebSocketHost::OnFlowControl(int64_t quota) { |
return; |
} |
- channel_->SendFlowControl(quota); |
+ if (receive_quota_multiplexer_.AddQuota(&renderer_quota_, quota)) { |
+ FlushQueueAndPublishQuotaIfAvailable(); |
+ // |this| may be destroyed here. |
+ } |
} |
void WebSocketHost::OnDropChannel(bool was_clean, |
@@ -529,15 +681,42 @@ void WebSocketHost::OnDropChannel(bool was_clean, |
WebSocketDispatcherHost::WebSocketHostState result = |
dispatcher_->DoDropChannel(routing_id_, false, |
net::kWebSocketErrorAbnormalClosure, ""); |
- DCHECK_EQ(WebSocketDispatcherHost::WEBSOCKET_HOST_DELETED, result); |
+ DCHECK_EQ(WEBSOCKET_HOST_DELETED, result); |
return; |
} |
+ if (pending_drop_channel_) |
+ return; |
+ |
blob_sender_.reset(); |
// TODO(yhirano): Handle |was_clean| appropriately. |
channel_->StartClosingHandshake(code, reason); |
} |
+void WebSocketHost::OnBinaryTypeChanged(WebSocketBinaryType new_type) { |
+ DVLOG(3) << "WebSocketHost::OnSetBinaryType" |
+ << " routing_id= " << routing_id_ |
+ << " new_type=" << static_cast<int>(new_type); |
+ |
+ binary_type_ = new_type; |
+} |
+ |
+void WebSocketHost::OnBlobConfirmed() { |
+ DVLOG(3) << "WebSocketHost::OnBlobConfirmed" |
+ << " routing_id= " << routing_id_; |
+ |
+ if (unconfirmed_blob_queue_.empty()) { |
+ ReceivedBadMessage(dispatcher_, |
+ bad_message::WSH_UNSOLICITED_BLOB_CONFIRMATION); |
+ return; |
+ } |
+ unconfirmed_blob_queue_.pop(); |
+ if (pending_drop_channel_ && !ShouldDelayDropChannel()) { |
+ DoDelayedDropChannel(); |
+ // |this| is destroyed here. |
+ } |
+} |
+ |
void WebSocketHost::BlobSendComplete(int result) { |
DVLOG(3) << "WebSocketHost::BlobSendComplete" |
<< " routing_id=" << routing_id_ |
@@ -575,4 +754,147 @@ void WebSocketHost::BlobSendComplete(int result) { |
} |
} |
+bool WebSocketHost::ShouldQueue(size_t data_size) { |
+ return (!data_frame_queue_.empty() || |
+ receive_quota_multiplexer_.AvailableQuota() < data_size || |
+ (blob_receiver_ && blob_receiver_->finish_called())); |
+} |
+ |
+void WebSocketHost::AppendToQueue(bool fin, |
+ OpCode type, |
+ const std::vector<char>& data) { |
+ data_frame_queue_.emplace(fin, type, data); |
+} |
+ |
+void WebSocketHost::FlushQueueAndPublishQuotaIfAvailable() { |
+ while (!data_frame_queue_.empty() && |
+ receive_quota_multiplexer_.AvailableQuota() >= |
+ data_frame_queue_.front().data.size()) { |
+ const QueuedFrame& front = data_frame_queue_.front(); |
+ if (blob_receiver_ && blob_receiver_->finish_called()) |
+ return; // without consuming the message. |
+ bool started_blob_receive = false; |
+ if (SendFrameInternal(front.fin, front.type, front.data, |
+ &started_blob_receive) == CHANNEL_DELETED) { |
+ // |this| has been destroyed here. |
+ return; |
+ } |
+ if (started_blob_receive) |
+ return; // without consuming the message. |
+ data_frame_queue_.pop(); |
+ } |
+ if (data_frame_queue_.empty()) { |
+ if (pending_drop_channel_ && !ShouldDelayDropChannel()) { |
+ DoDelayedDropChannel(); |
+ // |this| is destroyed here. No need to publish more quota. |
+ return; |
+ } |
+ receive_quota_multiplexer_.PublishMoreQuotaIfAvailable(); |
+ // |this| may be destroyed here. |
+ } |
+} |
+ |
+ChannelState WebSocketHost::SendFrameInternal(bool fin, |
+ OpCode type, |
+ const std::vector<char>& data, |
+ bool* started_blob_receive) { |
+ receive_quota_multiplexer_.ConsumedQuota(data.size()); |
+ if (blob_receiver_) { |
+ DCHECK_NE(type, WebSocketFrameHeader::kOpCodeText); |
+ int rv = blob_receiver_->AppendData(data); |
+ if (rv != net::ERR_IO_PENDING && rv < 0) { |
+ BlobReceiveFailed(rv); |
+ // |this| is destroyed. |
+ return CHANNEL_DELETED; |
+ } |
+ if (fin) { |
+ int rv = blob_receiver_->Finish(); |
+ if (rv != net::ERR_IO_PENDING && rv < 0) { |
+ BlobReceiveFailed(rv); |
+ // |this| is destroyed. |
+ return CHANNEL_DELETED; |
+ } |
+ } |
+ return CHANNEL_ALIVE; |
+ } |
+ |
+ if (type == WebSocketFrameHeader::kOpCodeBinary && |
+ binary_type_ == WebSocketBinaryType::BLOB) { |
+ StartReceivingBlob(fin, data); |
+ *started_blob_receive = true; |
+ return CHANNEL_ALIVE; |
+ } |
+ |
+ return StateCast(dispatcher_->SendFrame(routing_id_, fin, |
+ OpCodeToMessageType(type), data)); |
+} |
+ |
+void WebSocketHost::StartReceivingBlob(bool fin, |
+ const std::vector<char>& data) { |
+ DCHECK(!blob_receiver_); |
+ |
+ blob_receiver_.reset( |
+ new WebSocketBlobReceiver(make_scoped_ptr(new BlobReceiverClient(this)), |
+ dispatcher_->blob_storage_context())); |
+ receive_quota_multiplexer_.SetConsumer(&blob_receiver_quota_); |
+ DCHECK_EQ(receive_quota_multiplexer_.AvailableQuota(), 0u); |
+ blob_receiver_->Start(); |
+} |
+ |
+void WebSocketHost::FinishReceivingBlob( |
+ scoped_ptr<storage::BlobDataHandle> blob_data_handle, |
+ uint64_t size) { |
+ const std::string& uuid = blob_data_handle->uuid(); |
+ unconfirmed_blob_queue_.push(std::move(blob_data_handle)); |
+ if (dispatcher_->BlobReceived(routing_id_, uuid, size) == |
+ WEBSOCKET_HOST_DELETED) { |
+ // |this| has been destroyed. |
+ return; |
+ } |
+ blob_receiver_.reset(); |
+ receive_quota_multiplexer_.SetConsumer(&renderer_quota_); |
+ blob_receiver_quota_ = 0; |
+ FlushQueueAndPublishQuotaIfAvailable(); |
+ // |this| may be destroyed here. |
+} |
+ |
+void WebSocketHost::BlobReceiveFailed(int net_error_code) { |
+ ignore_result(dispatcher_->NotifyFailure( |
+ routing_id_, |
+ "Blob receive failed: " + net::ErrorToString(net_error_code))); |
+ // |this| is destroyed here. |
+} |
+ |
+bool WebSocketHost::ShouldDelayDropChannel() const { |
+ return blob_receiver_ || !data_frame_queue_.empty() || |
+ !unconfirmed_blob_queue_.empty(); |
+} |
+ |
+void WebSocketHost::SetPendingDropChannel(bool was_clean, |
+ uint16_t code, |
+ const std::string& reason) { |
+ DCHECK(!pending_drop_channel_); |
+ pending_drop_channel_.reset( |
+ new DropChannelParameters(was_clean, code, reason)); |
+ // Should not send any more messages on the channel. |
+ blob_sender_.reset(); |
+ // Should not supply any more quota to the channel. |
+ receive_quota_multiplexer_.SetChannel(nullptr); |
+} |
+ |
+void WebSocketHost::DoDelayedDropChannel() { |
+ DCHECK(!ShouldDelayDropChannel()); |
+ scoped_ptr<DropChannelParameters> parameters = |
+ std::move(pending_drop_channel_); |
+ DVLOG(3) |
+ << "WebSocketHost::DoDelayedDropChannel() performing delayed DropChannel" |
+ << " routing_id=" << routing_id_ << " was_clean=" << parameters->was_clean |
+ << " code=" << parameters->code << " reason=\"" << parameters->reason |
+ << "\""; |
+ ignore_result(dispatcher_->DoDropChannel(routing_id_, parameters->was_clean, |
+ parameters->code, |
+ parameters->reason)); |
+ // |this| is destroyed here. |
+} |
+ |
} // namespace content |