Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2129)

Unified Diff: content/browser/renderer_host/websocket_host.cc

Issue 1664743002: [OBSOLETE][DO NOT SUBMIT][DO NOT COMMIT]] Browser-side implementation of WebSocket Blob receive. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@websocket_blob_send_sender
Patch Set: Now actually works. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « content/browser/renderer_host/websocket_host.h ('k') | content/common/websocket.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: content/browser/renderer_host/websocket_host.cc
diff --git a/content/browser/renderer_host/websocket_host.cc b/content/browser/renderer_host/websocket_host.cc
index d0715889db733e6a55e4187fb5752372b909f85d..4a291a0b51e3103d425e6e79ce202def4dd42428 100644
--- a/content/browser/renderer_host/websocket_host.cc
+++ b/content/browser/renderer_host/websocket_host.cc
@@ -17,6 +17,7 @@
#include "base/strings/stringprintf.h"
#include "base/thread_task_runner_handle.h"
#include "content/browser/bad_message.h"
+#include "content/browser/renderer_host/websocket_blob_receiver.h"
#include "content/browser/renderer_host/websocket_blob_sender.h"
#include "content/browser/renderer_host/websocket_dispatcher_host.h"
#include "content/browser/ssl/ssl_error_handler.h"
@@ -33,7 +34,6 @@
#include "net/ssl/ssl_info.h"
#include "net/websockets/websocket_channel.h"
#include "net/websockets/websocket_errors.h"
-#include "net/websockets/websocket_event_interface.h"
#include "net/websockets/websocket_frame.h" // for WebSocketFrameHeader::OpCode
#include "net/websockets/websocket_handshake_request_info.h"
#include "net/websockets/websocket_handshake_response_info.h"
@@ -43,55 +43,56 @@ namespace content {
namespace {
-typedef net::WebSocketEventInterface::ChannelState ChannelState;
+using net::WebSocketEventInterface;
+using net::WebSocketFrameHeader;
+using ChannelState = WebSocketEventInterface::ChannelState;
+using OpCode = WebSocketFrameHeader::OpCode;
+
+const auto CHANNEL_ALIVE = WebSocketEventInterface::CHANNEL_ALIVE;
+const auto CHANNEL_DELETED = WebSocketEventInterface::CHANNEL_DELETED;
+const auto WEBSOCKET_HOST_ALIVE = WebSocketDispatcherHost::WEBSOCKET_HOST_ALIVE;
+const auto WEBSOCKET_HOST_DELETED =
+ WebSocketDispatcherHost::WEBSOCKET_HOST_DELETED;
// Convert a content::WebSocketMessageType to a
-// net::WebSocketFrameHeader::OpCode
-net::WebSocketFrameHeader::OpCode MessageTypeToOpCode(
- WebSocketMessageType type) {
+// WebSocketFrameHeader::OpCode
+OpCode MessageTypeToOpCode(WebSocketMessageType type) {
DCHECK(type == WEB_SOCKET_MESSAGE_TYPE_CONTINUATION ||
type == WEB_SOCKET_MESSAGE_TYPE_TEXT ||
type == WEB_SOCKET_MESSAGE_TYPE_BINARY);
- typedef net::WebSocketFrameHeader::OpCode OpCode;
// These compile asserts verify that the same underlying values are used for
// both types, so we can simply cast between them.
static_assert(static_cast<OpCode>(WEB_SOCKET_MESSAGE_TYPE_CONTINUATION) ==
- net::WebSocketFrameHeader::kOpCodeContinuation,
+ WebSocketFrameHeader::kOpCodeContinuation,
"enum values must match for opcode continuation");
static_assert(static_cast<OpCode>(WEB_SOCKET_MESSAGE_TYPE_TEXT) ==
- net::WebSocketFrameHeader::kOpCodeText,
+ WebSocketFrameHeader::kOpCodeText,
"enum values must match for opcode text");
static_assert(static_cast<OpCode>(WEB_SOCKET_MESSAGE_TYPE_BINARY) ==
- net::WebSocketFrameHeader::kOpCodeBinary,
+ WebSocketFrameHeader::kOpCodeBinary,
"enum values must match for opcode binary");
return static_cast<OpCode>(type);
}
-WebSocketMessageType OpCodeToMessageType(
- net::WebSocketFrameHeader::OpCode opCode) {
- DCHECK(opCode == net::WebSocketFrameHeader::kOpCodeContinuation ||
- opCode == net::WebSocketFrameHeader::kOpCodeText ||
- opCode == net::WebSocketFrameHeader::kOpCodeBinary);
+WebSocketMessageType OpCodeToMessageType(OpCode opCode) {
+ DCHECK(opCode == WebSocketFrameHeader::kOpCodeContinuation ||
+ opCode == WebSocketFrameHeader::kOpCodeText ||
+ opCode == WebSocketFrameHeader::kOpCodeBinary);
// This cast is guaranteed valid by the static_assert() statements above.
return static_cast<WebSocketMessageType>(opCode);
}
ChannelState StateCast(WebSocketDispatcherHost::WebSocketHostState host_state) {
- const WebSocketDispatcherHost::WebSocketHostState WEBSOCKET_HOST_ALIVE =
- WebSocketDispatcherHost::WEBSOCKET_HOST_ALIVE;
- const WebSocketDispatcherHost::WebSocketHostState WEBSOCKET_HOST_DELETED =
- WebSocketDispatcherHost::WEBSOCKET_HOST_DELETED;
-
DCHECK(host_state == WEBSOCKET_HOST_ALIVE ||
host_state == WEBSOCKET_HOST_DELETED);
// These compile asserts verify that we can get away with using static_cast<>
// for the conversion.
- static_assert(static_cast<ChannelState>(WEBSOCKET_HOST_ALIVE) ==
- net::WebSocketEventInterface::CHANNEL_ALIVE,
- "enum values must match for state_alive");
- static_assert(static_cast<ChannelState>(WEBSOCKET_HOST_DELETED) ==
- net::WebSocketEventInterface::CHANNEL_DELETED,
- "enum values must match for state_deleted");
+ static_assert(
+ static_cast<ChannelState>(WEBSOCKET_HOST_ALIVE) == CHANNEL_ALIVE,
+ "enum values must match for state_alive");
+ static_assert(
+ static_cast<ChannelState>(WEBSOCKET_HOST_DELETED) == CHANNEL_DELETED,
+ "enum values must match for state_deleted");
return static_cast<ChannelState>(host_state);
}
@@ -107,8 +108,8 @@ class SendChannelImpl final : public WebSocketBlobSender::Channel {
}
ChannelState SendFrame(bool fin, const std::vector<char>& data) override {
- int opcode = first_frame_ ? net::WebSocketFrameHeader::kOpCodeBinary
- : net::WebSocketFrameHeader::kOpCodeContinuation;
+ int opcode = first_frame_ ? WebSocketFrameHeader::kOpCodeBinary
+ : WebSocketFrameHeader::kOpCodeContinuation;
first_frame_ = false;
return channel_->SendFrame(fin, opcode, data);
}
@@ -152,7 +153,7 @@ class WebSocketHost::WebSocketEventHandler final
ChannelState OnFinishOpeningHandshake(
scoped_ptr<net::WebSocketHandshakeResponseInfo> response) override;
ChannelState OnSSLCertificateError(
- scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks,
+ scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> callbacks,
const GURL& url,
const net::SSLInfo& ssl_info,
bool fatal) override;
@@ -161,7 +162,7 @@ class WebSocketHost::WebSocketEventHandler final
class SSLErrorHandlerDelegate final : public SSLErrorHandler::Delegate {
public:
SSLErrorHandlerDelegate(
- scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks);
+ scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> callbacks);
~SSLErrorHandlerDelegate() override;
base::WeakPtr<SSLErrorHandler::Delegate> GetWeakPtr();
@@ -171,7 +172,7 @@ class WebSocketHost::WebSocketEventHandler final
void ContinueSSLRequest() override;
private:
- scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks_;
+ scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> callbacks_;
base::WeakPtrFactory<SSLErrorHandlerDelegate> weak_ptr_factory_;
DISALLOW_COPY_AND_ASSIGN(SSLErrorHandlerDelegate);
@@ -204,8 +205,8 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnAddChannelResponse(
const std::string& selected_protocol,
const std::string& extensions) {
DVLOG(3) << "WebSocketEventHandler::OnAddChannelResponse"
- << " routing_id=" << routing_id_
- << " selected_protocol=\"" << selected_protocol << "\""
+ << " routing_id=" << routing_id_ << " selected_protocol=\""
+ << selected_protocol << "\""
<< " extensions=\"" << extensions << "\"";
return StateCast(dispatcher_->SendAddChannelResponse(
@@ -214,14 +215,27 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnAddChannelResponse(
ChannelState WebSocketHost::WebSocketEventHandler::OnDataFrame(
bool fin,
- net::WebSocketFrameHeader::OpCode type,
+ OpCode type,
const std::vector<char>& data) {
DVLOG(3) << "WebSocketEventHandler::OnDataFrame"
<< " routing_id=" << routing_id_ << " fin=" << fin
<< " type=" << type << " data is " << data.size() << " bytes";
- return StateCast(dispatcher_->SendFrame(routing_id_, fin,
- OpCodeToMessageType(type), data));
+ host_->receive_quota_multiplexer_.ReceivedFrame(data.size());
+ if (host_->ShouldQueue(data.size())) {
+ host_->AppendToQueue(fin, type, data);
+ return CHANNEL_ALIVE;
+ }
+
+ bool started_blob_receive = false;
+ if (host_->SendFrameInternal(fin, type, data, &started_blob_receive) ==
+ CHANNEL_DELETED) {
+ // |this| has been destroyed here.
+ return CHANNEL_DELETED;
+ }
+ if (started_blob_receive)
+ host_->AppendToQueue(fin, type, data);
+ return CHANNEL_ALIVE;
}
ChannelState WebSocketHost::WebSocketEventHandler::OnClosingHandshake() {
@@ -249,6 +263,11 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnDropChannel(
<< " routing_id=" << routing_id_ << " was_clean=" << was_clean
<< " code=" << code << " reason=\"" << reason << "\"";
+ if (host_->ShouldDelayDropChannel()) {
+ DVLOG(3) << "WebSocketEventHandler::OnDropChannel delayed";
+ host_->SetPendingDropChannel(was_clean, code, reason);
+ return CHANNEL_ALIVE;
+ }
return StateCast(
dispatcher_->DoDropChannel(routing_id_, was_clean, code, reason));
}
@@ -268,7 +287,7 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnStartOpeningHandshake(
<< "should_send=" << should_send;
if (!should_send)
- return WebSocketEventInterface::CHANNEL_ALIVE;
+ return CHANNEL_ALIVE;
WebSocketHandshakeRequest request_to_pass;
request_to_pass.url.Swap(&request->url);
@@ -292,7 +311,7 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnFinishOpeningHandshake(
<< "should_send=" << should_send;
if (!should_send)
- return WebSocketEventInterface::CHANNEL_ALIVE;
+ return CHANNEL_ALIVE;
WebSocketHandshakeResponse response_to_pass;
response_to_pass.url.Swap(&response->url);
@@ -312,7 +331,7 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnFinishOpeningHandshake(
}
ChannelState WebSocketHost::WebSocketEventHandler::OnSSLCertificateError(
- scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks,
+ scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> callbacks,
const GURL& url,
const net::SSLInfo& ssl_info,
bool fatal) {
@@ -325,12 +344,12 @@ ChannelState WebSocketHost::WebSocketEventHandler::OnSSLCertificateError(
ssl_error_handler_delegate_->GetWeakPtr(), url,
dispatcher_->render_process_id(), render_frame_id_, ssl_info, fatal);
// The above method is always asynchronous.
- return WebSocketEventInterface::CHANNEL_ALIVE;
+ return CHANNEL_ALIVE;
}
WebSocketHost::WebSocketEventHandler::SSLErrorHandlerDelegate::
SSLErrorHandlerDelegate(
- scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks)
+ scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> callbacks)
: callbacks_(std::move(callbacks)), weak_ptr_factory_(this) {}
WebSocketHost::WebSocketEventHandler::SSLErrorHandlerDelegate::
@@ -356,6 +375,123 @@ void WebSocketHost::WebSocketEventHandler::SSLErrorHandlerDelegate::
callbacks_->ContinueSSLRequest();
}
+WebSocketHost::ReceiveQuotaMultiplexer::ReceiveQuotaMultiplexer() {}
+
+void WebSocketHost::ReceiveQuotaMultiplexer::SetChannel(
+ net::WebSocketChannel* channel) {
+ channel_ = channel;
+ if (!channel) {
+ channel_quota_ = 0;
+ }
+}
+
+void WebSocketHost::ReceiveQuotaMultiplexer::SetConsumer(
+ ReceiveQuotaConsumer* consumer) {
+ DCHECK(consumer);
+ current_consumer_ = consumer;
+}
+
+// Quota can be added to any consumer at any time.
+bool WebSocketHost::ReceiveQuotaMultiplexer::AddQuota(
+ ReceiveQuotaProvider* provider,
+ size_t quota) {
+ *provider += quota;
+ return provider == current_consumer_;
+}
+
+// Quota can only be used from the current consumer.
+size_t WebSocketHost::ReceiveQuotaMultiplexer::AvailableQuota() const {
+ DCHECK(current_consumer_);
+ return *current_consumer_;
+}
+
+// Quota is only consumed by the current consumer.
+void WebSocketHost::ReceiveQuotaMultiplexer::ConsumedQuota(size_t quota) {
+ DCHECK_GE(*current_consumer_, quota);
+ *current_consumer_ -= quota;
+}
+
+void WebSocketHost::ReceiveQuotaMultiplexer::ReceivedFrame(size_t size) {
+ DCHECK(channel_);
+ DCHECK_GE(channel_quota_, size);
+ channel_quota_ -= size;
+}
+
+void WebSocketHost::ReceiveQuotaMultiplexer::PublishMoreQuotaIfAvailable() {
+ if (!channel_)
+ return;
+ DCHECK(current_consumer_);
+ if (*current_consumer_ > channel_quota_) {
+ size_t additional_quota =
+ *current_consumer_ - channel_quota_;
+ channel_quota_ = *current_consumer_;
+ // SendFlowControl may call back into OnDataFrame, so order matters here.
+ channel_->SendFlowControl(additional_quota);
+ }
+}
+
+WebSocketHost::QueuedFrame::QueuedFrame(bool fin,
+ OpCode type,
+ const std::vector<char>& data)
+ : fin(fin), type(type), data(data) {}
+
+WebSocketHost::QueuedFrame::QueuedFrame(QueuedFrame&& rhs)
+ : fin(rhs.fin), type(rhs.type), data(std::move(rhs.data)) {}
+
+WebSocketHost::QueuedFrame::~QueuedFrame() {}
+
+WebSocketHost::QueuedFrame& WebSocketHost::QueuedFrame::operator=(
+ QueuedFrame&& rhs) {
+ if (this == &rhs)
+ return *this;
+ fin = rhs.fin;
+ type = rhs.type;
+ data = std::move(rhs.data);
+ return *this;
+}
+
+struct WebSocketHost::DropChannelParameters {
+ DropChannelParameters(bool was_clean,
+ uint16_t code,
+ const std::string& reason)
+ : was_clean(was_clean), code(code), reason(reason) {}
+
+ bool was_clean;
+ uint16_t code;
+ std::string reason;
+};
+
+class WebSocketHost::BlobReceiverClient final
+ : public WebSocketBlobReceiver::Client {
+ public:
+ explicit BlobReceiverClient(WebSocketHost* host) : host_(host) {}
+
+ // Implementation of WebSocketBlobReceiver::Client
+ void BlobCreated(scoped_ptr<storage::BlobDataHandle> blob_data_handle,
+ uint64_t size) override {
+ host_->FinishReceivingBlob(std::move(blob_data_handle), size);
+ // |this| may be destroyed here.
+ }
+
+ void BlobFailed(int net_error_code) override {
+ host_->BlobReceiveFailed(net_error_code);
+ // |this| is destroyed here.
+ }
+
+ void AddFlowControlQuota(size_t quota) override {
+ if (host_->receive_quota_multiplexer_.AddQuota(&host_->blob_receiver_quota_,
+ quota)) {
+ host_->FlushQueueAndPublishQuotaIfAvailable();
+ // |this| may be destroyed here.
+ }
+ }
+
+ private:
+ WebSocketHost* host_;
+
+ DISALLOW_COPY_AND_ASSIGN(BlobReceiverClient);
+};
+
WebSocketHost::WebSocketHost(int routing_id,
WebSocketDispatcherHost* dispatcher,
net::URLRequestContext* url_request_context,
@@ -366,6 +502,9 @@ WebSocketHost::WebSocketHost(int routing_id,
delay_(delay),
pending_flow_control_quota_(0),
handshake_succeeded_(false),
+ binary_type_(WebSocketBinaryType::BLOB),
+ renderer_quota_(0),
+ blob_receiver_quota_(0),
weak_ptr_factory_(this) {
DVLOG(1) << "WebSocketHost: created routing_id=" << routing_id;
}
@@ -382,6 +521,8 @@ bool WebSocketHost::OnMessageReceived(const IPC::Message& message) {
IPC_BEGIN_MESSAGE_MAP(WebSocketHost, message)
IPC_MESSAGE_HANDLER(WebSocketHostMsg_AddChannelRequest, OnAddChannelRequest)
IPC_MESSAGE_HANDLER(WebSocketHostMsg_SendBlob, OnSendBlob)
+ IPC_MESSAGE_HANDLER(WebSocketHostMsg_BinaryTypeChanged, OnBinaryTypeChanged)
+ IPC_MESSAGE_HANDLER(WebSocketHostMsg_BlobConfirmed, OnBlobConfirmed)
IPC_MESSAGE_HANDLER(WebSocketMsg_SendFrame, OnSendFrame)
IPC_MESSAGE_HANDLER(WebSocketMsg_FlowControl, OnFlowControl)
IPC_MESSAGE_HANDLER(WebSocketMsg_DropChannel, OnDropChannel)
@@ -445,6 +586,8 @@ void WebSocketHost::AddChannel(
pending_flow_control_quota_));
pending_flow_control_quota_ = 0;
}
+ receive_quota_multiplexer_.SetChannel(channel_.get());
+ receive_quota_multiplexer_.SetConsumer(&renderer_quota_);
channel_->SendAddChannelRequest(socket_url, requested_protocols, origin);
// |this| may have been deleted here.
@@ -468,8 +611,7 @@ void WebSocketHost::OnSendBlob(const std::string& uuid,
storage::FileSystemContext* file_system_context =
partition->GetFileSystemContext();
- net::WebSocketEventInterface::ChannelState channel_state =
- net::WebSocketEventInterface::CHANNEL_ALIVE;
+ ChannelState channel_state = CHANNEL_ALIVE;
// This use of base::Unretained is safe because the WebSocketBlobSender object
// is owned by this object and will not call it back after destruction.
@@ -479,8 +621,7 @@ void WebSocketHost::OnSendBlob(const std::string& uuid,
BrowserThread::GetMessageLoopProxyForThread(BrowserThread::FILE).get(),
&channel_state,
base::Bind(&WebSocketHost::BlobSendComplete, base::Unretained(this)));
- if (channel_state == net::WebSocketEventInterface::CHANNEL_ALIVE &&
- rv != net::ERR_IO_PENDING)
+ if (channel_state == CHANNEL_ALIVE && rv != net::ERR_IO_PENDING)
BlobSendComplete(rv);
// |this| may be destroyed here.
}
@@ -492,6 +633,9 @@ void WebSocketHost::OnSendFrame(bool fin,
<< " routing_id=" << routing_id_ << " fin=" << fin
<< " type=" << type << " data is " << data.size() << " bytes";
+ if (pending_drop_channel_)
+ return;
+
DCHECK(channel_);
if (blob_sender_) {
bad_message::ReceivedBadMessage(
@@ -504,6 +648,11 @@ void WebSocketHost::OnSendFrame(bool fin,
void WebSocketHost::OnFlowControl(int64_t quota) {
DVLOG(3) << "WebSocketHost::OnFlowControl"
<< " routing_id=" << routing_id_ << " quota=" << quota;
+ if (quota < 0) {
+ bad_message::ReceivedBadMessage(dispatcher_,
+ bad_message::WSH_NEGATIVE_QUOTA);
+ return;
+ }
if (!channel_) {
// WebSocketChannel is not yet created due to the delay introduced by
@@ -513,7 +662,10 @@ void WebSocketHost::OnFlowControl(int64_t quota) {
return;
}
- channel_->SendFlowControl(quota);
+ if (receive_quota_multiplexer_.AddQuota(&renderer_quota_, quota)) {
+ FlushQueueAndPublishQuotaIfAvailable();
+ // |this| may be destroyed here.
+ }
}
void WebSocketHost::OnDropChannel(bool was_clean,
@@ -529,15 +681,42 @@ void WebSocketHost::OnDropChannel(bool was_clean,
WebSocketDispatcherHost::WebSocketHostState result =
dispatcher_->DoDropChannel(routing_id_, false,
net::kWebSocketErrorAbnormalClosure, "");
- DCHECK_EQ(WebSocketDispatcherHost::WEBSOCKET_HOST_DELETED, result);
+ DCHECK_EQ(WEBSOCKET_HOST_DELETED, result);
return;
}
+ if (pending_drop_channel_)
+ return;
+
blob_sender_.reset();
// TODO(yhirano): Handle |was_clean| appropriately.
channel_->StartClosingHandshake(code, reason);
}
+void WebSocketHost::OnBinaryTypeChanged(WebSocketBinaryType new_type) {
+ DVLOG(3) << "WebSocketHost::OnSetBinaryType"
+ << " routing_id= " << routing_id_
+ << " new_type=" << static_cast<int>(new_type);
+
+ binary_type_ = new_type;
+}
+
+void WebSocketHost::OnBlobConfirmed() {
+ DVLOG(3) << "WebSocketHost::OnBlobConfirmed"
+ << " routing_id= " << routing_id_;
+
+ if (unconfirmed_blob_queue_.empty()) {
+ ReceivedBadMessage(dispatcher_,
+ bad_message::WSH_UNSOLICITED_BLOB_CONFIRMATION);
+ return;
+ }
+ unconfirmed_blob_queue_.pop();
+ if (pending_drop_channel_ && !ShouldDelayDropChannel()) {
+ DoDelayedDropChannel();
+ // |this| is destroyed here.
+ }
+}
+
void WebSocketHost::BlobSendComplete(int result) {
DVLOG(3) << "WebSocketHost::BlobSendComplete"
<< " routing_id=" << routing_id_
@@ -575,4 +754,147 @@ void WebSocketHost::BlobSendComplete(int result) {
}
}
+bool WebSocketHost::ShouldQueue(size_t data_size) {
+ return (!data_frame_queue_.empty() ||
+ receive_quota_multiplexer_.AvailableQuota() < data_size ||
+ (blob_receiver_ && blob_receiver_->finish_called()));
+}
+
+void WebSocketHost::AppendToQueue(bool fin,
+ OpCode type,
+ const std::vector<char>& data) {
+ data_frame_queue_.emplace(fin, type, data);
+}
+
+void WebSocketHost::FlushQueueAndPublishQuotaIfAvailable() {
+ while (!data_frame_queue_.empty() &&
+ receive_quota_multiplexer_.AvailableQuota() >=
+ data_frame_queue_.front().data.size()) {
+ const QueuedFrame& front = data_frame_queue_.front();
+ if (blob_receiver_ && blob_receiver_->finish_called())
+ return; // without consuming the message.
+ bool started_blob_receive = false;
+ if (SendFrameInternal(front.fin, front.type, front.data,
+ &started_blob_receive) == CHANNEL_DELETED) {
+ // |this| has been destroyed here.
+ return;
+ }
+ if (started_blob_receive)
+ return; // without consuming the message.
+ data_frame_queue_.pop();
+ }
+ if (data_frame_queue_.empty()) {
+ if (pending_drop_channel_ && !ShouldDelayDropChannel()) {
+ DoDelayedDropChannel();
+ // |this| is destroyed here. No need to publish more quota.
+ return;
+ }
+ receive_quota_multiplexer_.PublishMoreQuotaIfAvailable();
+ // |this| may be destroyed here.
+ }
+}
+
+ChannelState WebSocketHost::SendFrameInternal(bool fin,
+ OpCode type,
+ const std::vector<char>& data,
+ bool* started_blob_receive) {
+ receive_quota_multiplexer_.ConsumedQuota(data.size());
+ if (blob_receiver_) {
+ DCHECK_NE(type, WebSocketFrameHeader::kOpCodeText);
+ int rv = blob_receiver_->AppendData(data);
+ if (rv != net::ERR_IO_PENDING && rv < 0) {
+ BlobReceiveFailed(rv);
+ // |this| is destroyed.
+ return CHANNEL_DELETED;
+ }
+ if (fin) {
+ int rv = blob_receiver_->Finish();
+ if (rv != net::ERR_IO_PENDING && rv < 0) {
+ BlobReceiveFailed(rv);
+ // |this| is destroyed.
+ return CHANNEL_DELETED;
+ }
+ }
+ return CHANNEL_ALIVE;
+ }
+
+ if (type == WebSocketFrameHeader::kOpCodeBinary &&
+ binary_type_ == WebSocketBinaryType::BLOB) {
+ StartReceivingBlob(fin, data);
+ *started_blob_receive = true;
+ return CHANNEL_ALIVE;
+ }
+
+ return StateCast(dispatcher_->SendFrame(routing_id_, fin,
+ OpCodeToMessageType(type), data));
+}
+
+void WebSocketHost::StartReceivingBlob(bool fin,
+ const std::vector<char>& data) {
+ DCHECK(!blob_receiver_);
+
+ blob_receiver_.reset(
+ new WebSocketBlobReceiver(make_scoped_ptr(new BlobReceiverClient(this)),
+ dispatcher_->blob_storage_context()));
+ receive_quota_multiplexer_.SetConsumer(&blob_receiver_quota_);
+ DCHECK_EQ(receive_quota_multiplexer_.AvailableQuota(), 0u);
+ blob_receiver_->Start();
+}
+
+void WebSocketHost::FinishReceivingBlob(
+ scoped_ptr<storage::BlobDataHandle> blob_data_handle,
+ uint64_t size) {
+ const std::string& uuid = blob_data_handle->uuid();
+ unconfirmed_blob_queue_.push(std::move(blob_data_handle));
+ if (dispatcher_->BlobReceived(routing_id_, uuid, size) ==
+ WEBSOCKET_HOST_DELETED) {
+ // |this| has been destroyed.
+ return;
+ }
+ blob_receiver_.reset();
+ receive_quota_multiplexer_.SetConsumer(&renderer_quota_);
+ blob_receiver_quota_ = 0;
+ FlushQueueAndPublishQuotaIfAvailable();
+ // |this| may be destroyed here.
+}
+
+void WebSocketHost::BlobReceiveFailed(int net_error_code) {
+ ignore_result(dispatcher_->NotifyFailure(
+ routing_id_,
+ "Blob receive failed: " + net::ErrorToString(net_error_code)));
+ // |this| is destroyed here.
+}
+
+bool WebSocketHost::ShouldDelayDropChannel() const {
+ return blob_receiver_ || !data_frame_queue_.empty() ||
+ !unconfirmed_blob_queue_.empty();
+}
+
+void WebSocketHost::SetPendingDropChannel(bool was_clean,
+ uint16_t code,
+ const std::string& reason) {
+ DCHECK(!pending_drop_channel_);
+ pending_drop_channel_.reset(
+ new DropChannelParameters(was_clean, code, reason));
+ // Should not send any more messages on the channel.
+ blob_sender_.reset();
+ // Should not supply any more quota to the channel.
+ receive_quota_multiplexer_.SetChannel(nullptr);
+}
+
+void WebSocketHost::DoDelayedDropChannel() {
+ DCHECK(!ShouldDelayDropChannel());
+ scoped_ptr<DropChannelParameters> parameters =
+ std::move(pending_drop_channel_);
+ DVLOG(3)
+ << "WebSocketHost::DoDelayedDropChannel() performing delayed DropChannel"
+ << " routing_id=" << routing_id_ << " was_clean=" << parameters->was_clean
+ << " code=" << parameters->code << " reason=\"" << parameters->reason
+ << "\"";
+ ignore_result(dispatcher_->DoDropChannel(routing_id_, parameters->was_clean,
+ parameters->code,
+ parameters->reason));
+ // |this| is destroyed here.
+}
+
} // namespace content
« no previous file with comments | « content/browser/renderer_host/websocket_host.h ('k') | content/common/websocket.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698