| Index: mojo/services/network/web_socket_impl.cc
|
| diff --git a/mojo/services/network/web_socket_impl.cc b/mojo/services/network/web_socket_impl.cc
|
| index f65249686584486a85f72fdb59b5fb97710c8646..1f97dc326f0d7615f17bdd03aeb074716e002aeb 100644
|
| --- a/mojo/services/network/web_socket_impl.cc
|
| +++ b/mojo/services/network/web_socket_impl.cc
|
| @@ -5,7 +5,10 @@
|
| #include "mojo/services/network/web_socket_impl.h"
|
|
|
| #include "base/logging.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "mojo/common/handle_watcher.h"
|
| #include "mojo/services/network/network_context.h"
|
| +#include "mojo/services/network/web_socket_data_pipe_queue.h"
|
| #include "net/websockets/websocket_channel.h"
|
| #include "net/websockets/websocket_errors.h"
|
| #include "net/websockets/websocket_event_interface.h"
|
| @@ -88,7 +91,14 @@ struct WebSocketEventHandler : public net::WebSocketEventInterface {
|
| const net::SSLInfo& ssl_info,
|
| bool fatal) OVERRIDE;
|
|
|
| + // Called once we've written to |receive_stream_|.
|
| + void DidWriteToReceiveStream(bool fin,
|
| + net::WebSocketFrameHeader::OpCode type,
|
| + uint32_t num_bytes,
|
| + const char* buffer);
|
| WebSocketClientPtr client_;
|
| + ScopedDataPipeProducerHandle receive_stream_;
|
| + scoped_ptr<WebSocketWriteQueue> write_queue_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(WebSocketEventHandler);
|
| };
|
| @@ -97,7 +107,11 @@ ChannelState WebSocketEventHandler::OnAddChannelResponse(
|
| bool fail,
|
| const std::string& selected_protocol,
|
| const std::string& extensions) {
|
| - client_->DidConnect(fail, selected_protocol, extensions);
|
| + DataPipe data_pipe;
|
| + receive_stream_ = data_pipe.producer_handle.Pass();
|
| + write_queue_.reset(new WebSocketWriteQueue(receive_stream_.get()));
|
| + client_->DidConnect(
|
| + fail, selected_protocol, extensions, data_pipe.consumer_handle.Pass());
|
| if (fail)
|
| return WebSocketEventInterface::CHANNEL_DELETED;
|
| return WebSocketEventInterface::CHANNEL_ALIVE;
|
| @@ -107,20 +121,11 @@ ChannelState WebSocketEventHandler::OnDataFrame(
|
| bool fin,
|
| net::WebSocketFrameHeader::OpCode type,
|
| const std::vector<char>& data) {
|
| - // TODO(mpcomplete): reuse the data pipe for subsequent frames.
|
| - uint32_t num_bytes = static_cast<uint32_t>(data.size());
|
| - MojoCreateDataPipeOptions options;
|
| - options.struct_size = sizeof(MojoCreateDataPipeOptions);
|
| - options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE;
|
| - options.element_num_bytes = 1;
|
| - options.capacity_num_bytes = num_bytes;
|
| - DataPipe data_pipe(options);
|
| - WriteDataRaw(data_pipe.producer_handle.get(),
|
| - &data[0],
|
| - &num_bytes,
|
| - MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
|
| - client_->DidReceiveData(fin, ConvertTo<WebSocket::MessageType>(type),
|
| - data_pipe.consumer_handle.Pass());
|
| + write_queue_->Write(
|
| + &data[0], data.size(),
|
| + base::Bind(&WebSocketEventHandler::DidWriteToReceiveStream,
|
| + base::Unretained(this),
|
| + fin, type, data.size()));
|
| return WebSocketEventInterface::CHANNEL_ALIVE;
|
| }
|
|
|
| @@ -164,6 +169,15 @@ ChannelState WebSocketEventHandler::OnSSLCertificateError(
|
| return WebSocketEventInterface::CHANNEL_DELETED;
|
| }
|
|
|
| +void WebSocketEventHandler::DidWriteToReceiveStream(
|
| + bool fin,
|
| + net::WebSocketFrameHeader::OpCode type,
|
| + uint32_t num_bytes,
|
| + const char* buffer) {
|
| + client_->DidReceiveData(
|
| + fin, ConvertTo<WebSocket::MessageType>(type), num_bytes);
|
| +}
|
| +
|
| } // namespace mojo
|
|
|
| WebSocketImpl::WebSocketImpl(NetworkContext* context) : context_(context) {
|
| @@ -175,8 +189,11 @@ WebSocketImpl::~WebSocketImpl() {
|
| void WebSocketImpl::Connect(const String& url,
|
| Array<String> protocols,
|
| const String& origin,
|
| + ScopedDataPipeConsumerHandle send_stream,
|
| WebSocketClientPtr client) {
|
| DCHECK(!channel_);
|
| + send_stream_ = send_stream.Pass();
|
| + read_queue_.reset(new WebSocketReadQueue(send_stream_.get()));
|
| scoped_ptr<net::WebSocketEventInterface> event_interface(
|
| new WebSocketEventHandler(client.Pass()));
|
| channel_.reset(new net::WebSocketChannel(event_interface.Pass(),
|
| @@ -188,14 +205,12 @@ void WebSocketImpl::Connect(const String& url,
|
|
|
| void WebSocketImpl::Send(bool fin,
|
| WebSocket::MessageType type,
|
| - ScopedDataPipeConsumerHandle data_pipe) {
|
| + uint32_t num_bytes) {
|
| DCHECK(channel_);
|
| - uint32_t num_bytes;
|
| - ReadDataRaw(data_pipe.get(), NULL, &num_bytes, MOJO_READ_DATA_FLAG_QUERY);
|
| - std::vector<char> data(num_bytes);
|
| - ReadDataRaw(data_pipe.get(), &data[0], &num_bytes, MOJO_READ_DATA_FLAG_NONE);
|
| - channel_->SendFrame(
|
| - fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), data);
|
| + read_queue_->Read(num_bytes,
|
| + base::Bind(&WebSocketImpl::DidReadFromSendStream,
|
| + base::Unretained(this),
|
| + fin, type, num_bytes));
|
| }
|
|
|
| void WebSocketImpl::FlowControl(int64_t quota) {
|
| @@ -208,4 +223,15 @@ void WebSocketImpl::Close(uint16_t code, const String& reason) {
|
| channel_->StartClosingHandshake(code, reason);
|
| }
|
|
|
| +void WebSocketImpl::DidReadFromSendStream(bool fin,
|
| + WebSocket::MessageType type,
|
| + uint32_t num_bytes,
|
| + const char* data) {
|
| + std::vector<char> buffer(num_bytes);
|
| + memcpy(&buffer[0], data, num_bytes);
|
| + DCHECK(channel_);
|
| + channel_->SendFrame(
|
| + fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), buffer);
|
| +}
|
| +
|
| } // namespace mojo
|
|
|