Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1582)

Unified Diff: mojo/services/network/web_socket_impl.cc

Issue 550003005: Mojo: WebSocket interface now reuses the DataPipe for subsequent sends or (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: give up Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/services/network/web_socket_impl.cc
diff --git a/mojo/services/network/web_socket_impl.cc b/mojo/services/network/web_socket_impl.cc
index f65249686584486a85f72fdb59b5fb97710c8646..7ba9b22da9657457dc87812e099919a95bab50dc 100644
--- a/mojo/services/network/web_socket_impl.cc
+++ b/mojo/services/network/web_socket_impl.cc
@@ -5,7 +5,11 @@
#include "mojo/services/network/web_socket_impl.h"
#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+#include "mojo/common/handle_watcher.h"
#include "mojo/services/network/network_context.h"
+#include "mojo/services/public/cpp/network/web_socket_read_queue.h"
+#include "mojo/services/public/cpp/network/web_socket_write_queue.h"
#include "net/websockets/websocket_channel.h"
#include "net/websockets/websocket_errors.h"
#include "net/websockets/websocket_event_interface.h"
@@ -88,7 +92,14 @@ struct WebSocketEventHandler : public net::WebSocketEventInterface {
const net::SSLInfo& ssl_info,
bool fatal) OVERRIDE;
+ // Called once we've written to |receive_stream_|.
+ void DidWriteToReceiveStream(bool fin,
+ net::WebSocketFrameHeader::OpCode type,
+ uint32_t num_bytes,
+ const char* buffer);
WebSocketClientPtr client_;
+ ScopedDataPipeProducerHandle receive_stream_;
+ scoped_ptr<WebSocketWriteQueue> write_queue_;
DISALLOW_COPY_AND_ASSIGN(WebSocketEventHandler);
};
@@ -97,7 +108,11 @@ ChannelState WebSocketEventHandler::OnAddChannelResponse(
bool fail,
const std::string& selected_protocol,
const std::string& extensions) {
- client_->DidConnect(fail, selected_protocol, extensions);
+ DataPipe data_pipe;
+ receive_stream_ = data_pipe.producer_handle.Pass();
+ write_queue_.reset(new WebSocketWriteQueue(receive_stream_.get()));
+ client_->DidConnect(
+ fail, selected_protocol, extensions, data_pipe.consumer_handle.Pass());
if (fail)
return WebSocketEventInterface::CHANNEL_DELETED;
return WebSocketEventInterface::CHANNEL_ALIVE;
@@ -107,20 +122,12 @@ ChannelState WebSocketEventHandler::OnDataFrame(
bool fin,
net::WebSocketFrameHeader::OpCode type,
const std::vector<char>& data) {
- // TODO(mpcomplete): reuse the data pipe for subsequent frames.
- uint32_t num_bytes = static_cast<uint32_t>(data.size());
- MojoCreateDataPipeOptions options;
- options.struct_size = sizeof(MojoCreateDataPipeOptions);
- options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE;
- options.element_num_bytes = 1;
- options.capacity_num_bytes = num_bytes;
- DataPipe data_pipe(options);
- WriteDataRaw(data_pipe.producer_handle.get(),
- &data[0],
- &num_bytes,
- MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
- client_->DidReceiveData(fin, ConvertTo<WebSocket::MessageType>(type),
- data_pipe.consumer_handle.Pass());
+ uint32_t size = static_cast<uint32_t>(data.size());
+ write_queue_->Write(
+ &data[0], size,
+ base::Bind(&WebSocketEventHandler::DidWriteToReceiveStream,
+ base::Unretained(this),
+ fin, type, size));
return WebSocketEventInterface::CHANNEL_ALIVE;
}
@@ -164,6 +171,15 @@ ChannelState WebSocketEventHandler::OnSSLCertificateError(
return WebSocketEventInterface::CHANNEL_DELETED;
}
+void WebSocketEventHandler::DidWriteToReceiveStream(
+ bool fin,
+ net::WebSocketFrameHeader::OpCode type,
+ uint32_t num_bytes,
+ const char* buffer) {
+ client_->DidReceiveData(
+ fin, ConvertTo<WebSocket::MessageType>(type), num_bytes);
+}
+
} // namespace mojo
WebSocketImpl::WebSocketImpl(NetworkContext* context) : context_(context) {
@@ -175,8 +191,11 @@ WebSocketImpl::~WebSocketImpl() {
void WebSocketImpl::Connect(const String& url,
Array<String> protocols,
const String& origin,
+ ScopedDataPipeConsumerHandle send_stream,
WebSocketClientPtr client) {
DCHECK(!channel_);
+ send_stream_ = send_stream.Pass();
+ read_queue_.reset(new WebSocketReadQueue(send_stream_.get()));
scoped_ptr<net::WebSocketEventInterface> event_interface(
new WebSocketEventHandler(client.Pass()));
channel_.reset(new net::WebSocketChannel(event_interface.Pass(),
@@ -188,14 +207,12 @@ void WebSocketImpl::Connect(const String& url,
void WebSocketImpl::Send(bool fin,
WebSocket::MessageType type,
- ScopedDataPipeConsumerHandle data_pipe) {
+ uint32_t num_bytes) {
DCHECK(channel_);
- uint32_t num_bytes;
- ReadDataRaw(data_pipe.get(), NULL, &num_bytes, MOJO_READ_DATA_FLAG_QUERY);
- std::vector<char> data(num_bytes);
- ReadDataRaw(data_pipe.get(), &data[0], &num_bytes, MOJO_READ_DATA_FLAG_NONE);
- channel_->SendFrame(
- fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), data);
+ read_queue_->Read(num_bytes,
+ base::Bind(&WebSocketImpl::DidReadFromSendStream,
+ base::Unretained(this),
+ fin, type, num_bytes));
}
void WebSocketImpl::FlowControl(int64_t quota) {
@@ -208,4 +225,15 @@ void WebSocketImpl::Close(uint16_t code, const String& reason) {
channel_->StartClosingHandshake(code, reason);
}
+void WebSocketImpl::DidReadFromSendStream(bool fin,
+ WebSocket::MessageType type,
+ uint32_t num_bytes,
+ const char* data) {
+ std::vector<char> buffer(num_bytes);
+ memcpy(&buffer[0], data, num_bytes);
+ DCHECK(channel_);
+ channel_->SendFrame(
+ fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), buffer);
+}
+
} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698