| Index: mojo/services/html_viewer/websockethandle_impl.cc
|
| diff --git a/mojo/services/html_viewer/websockethandle_impl.cc b/mojo/services/html_viewer/websockethandle_impl.cc
|
| index e688060221d3506ed2dbc2c17f9e72eda9087c84..3dcb65f6b9f89693a9d9e9c8672f4299c100ecf1 100644
|
| --- a/mojo/services/html_viewer/websockethandle_impl.cc
|
| +++ b/mojo/services/html_viewer/websockethandle_impl.cc
|
| @@ -6,7 +6,11 @@
|
|
|
| #include <vector>
|
|
|
| +#include "base/bind.h"
|
| +#include "base/memory/scoped_vector.h"
|
| #include "mojo/services/html_viewer/blink_basic_type_converters.h"
|
| +#include "mojo/services/public/cpp/network/web_socket_read_queue.h"
|
| +#include "mojo/services/public/cpp/network/web_socket_write_queue.h"
|
| #include "mojo/services/public/interfaces/network/network_service.mojom.h"
|
| #include "third_party/WebKit/public/platform/WebSerializedOrigin.h"
|
| #include "third_party/WebKit/public/platform/WebSocketHandleClient.h"
|
| @@ -67,12 +71,15 @@ class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> {
|
|
|
| private:
|
| // WebSocketClient methods:
|
| - virtual void DidConnect(
|
| - bool fail,
|
| - const String& selected_subprotocol,
|
| - const String& extensions) OVERRIDE {
|
| + virtual void DidConnect(bool fail,
|
| + const String& selected_subprotocol,
|
| + const String& extensions,
|
| + ScopedDataPipeConsumerHandle receive_stream)
|
| + OVERRIDE {
|
| blink::WebSocketHandleClient* client = client_;
|
| WebSocketHandleImpl* handle = handle_;
|
| + receive_stream_ = receive_stream.Pass();
|
| + read_queue_.reset(new WebSocketReadQueue(receive_stream_.get()));
|
| if (fail)
|
| handle->Disconnect(); // deletes |this|
|
| client->didConnect(handle,
|
| @@ -84,19 +91,11 @@ class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> {
|
|
|
| virtual void DidReceiveData(bool fin,
|
| WebSocket::MessageType type,
|
| - ScopedDataPipeConsumerHandle data_pipe) OVERRIDE {
|
| - uint32_t num_bytes;
|
| - ReadDataRaw(data_pipe.get(), NULL, &num_bytes, MOJO_READ_DATA_FLAG_QUERY);
|
| - std::vector<char> data(num_bytes);
|
| - ReadDataRaw(
|
| - data_pipe.get(), &data[0], &num_bytes, MOJO_READ_DATA_FLAG_NONE);
|
| - const char* data_ptr = data.empty() ? NULL : &data[0];
|
| - client_->didReceiveData(handle_,
|
| - fin,
|
| - ConvertTo<WebSocketHandle::MessageType>(type),
|
| - data_ptr,
|
| - data.size());
|
| - // |handle| can be deleted here.
|
| + uint32_t num_bytes) OVERRIDE {
|
| + read_queue_->Read(num_bytes,
|
| + base::Bind(&WebSocketClientImpl::DidReadFromReceiveStream,
|
| + base::Unretained(this),
|
| + fin, type, num_bytes));
|
| }
|
|
|
| virtual void DidReceiveFlowControl(int64_t quota) OVERRIDE {
|
| @@ -122,8 +121,23 @@ class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> {
|
| // |handle| can be deleted here.
|
| }
|
|
|
| + void DidReadFromReceiveStream(bool fin,
|
| + WebSocket::MessageType type,
|
| + uint32_t num_bytes,
|
| + const char* data) {
|
| + client_->didReceiveData(handle_,
|
| + fin,
|
| + ConvertTo<WebSocketHandle::MessageType>(type),
|
| + data,
|
| + num_bytes);
|
| + // |handle_| can be deleted here.
|
| + }
|
| +
|
| + // |handle_| owns this object, so it is guaranteed to outlive us.
|
| WebSocketHandleImpl* handle_;
|
| blink::WebSocketHandleClient* client_;
|
| + ScopedDataPipeConsumerHandle receive_stream_;
|
| + scoped_ptr<WebSocketReadQueue> read_queue_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(WebSocketClientImpl);
|
| };
|
| @@ -150,9 +164,14 @@ void WebSocketHandleImpl::connect(const WebURL& url,
|
| // TODO(mpcomplete): Is this the right ownership model? Or should mojo own
|
| // |client_|?
|
| WeakBindToProxy(client_.get(), &client_ptr);
|
| +
|
| + DataPipe data_pipe;
|
| + send_stream_ = data_pipe.producer_handle.Pass();
|
| + write_queue_.reset(new WebSocketWriteQueue(send_stream_.get()));
|
| web_socket_->Connect(url.string().utf8(),
|
| Array<String>::From(protocols),
|
| origin.string().utf8(),
|
| + data_pipe.consumer_handle.Pass(),
|
| client_ptr.Pass());
|
| }
|
|
|
| @@ -163,22 +182,12 @@ void WebSocketHandleImpl::send(bool fin,
|
| if (!client_)
|
| return;
|
|
|
| - // TODO(mpcomplete): reuse the data pipe for subsequent sends.
|
| - uint32_t num_bytes = static_cast<uint32_t>(size);
|
| - MojoCreateDataPipeOptions options;
|
| - options.struct_size = sizeof(MojoCreateDataPipeOptions);
|
| - options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE;
|
| - options.element_num_bytes = 1;
|
| - options.capacity_num_bytes = num_bytes;
|
| - DataPipe data_pipe(options);
|
| - WriteDataRaw(data_pipe.producer_handle.get(),
|
| - data,
|
| - &num_bytes,
|
| - MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
|
| - web_socket_->Send(
|
| - fin,
|
| - ConvertTo<WebSocket::MessageType>(type),
|
| - data_pipe.consumer_handle.Pass());
|
| + uint32_t size32 = static_cast<uint32_t>(size);
|
| + write_queue_->Write(
|
| + data, size32,
|
| + base::Bind(&WebSocketHandleImpl::DidWriteToSendStream,
|
| + base::Unretained(this),
|
| + fin, type, size32));
|
| }
|
|
|
| void WebSocketHandleImpl::flowControl(int64_t quota) {
|
| @@ -192,6 +201,14 @@ void WebSocketHandleImpl::close(unsigned short code, const WebString& reason) {
|
| web_socket_->Close(code, reason.utf8());
|
| }
|
|
|
| +void WebSocketHandleImpl::DidWriteToSendStream(
|
| + bool fin,
|
| + WebSocketHandle::MessageType type,
|
| + uint32_t num_bytes,
|
| + const char* data) {
|
| + web_socket_->Send(fin, ConvertTo<WebSocket::MessageType>(type), num_bytes);
|
| +}
|
| +
|
| void WebSocketHandleImpl::Disconnect() {
|
| did_close_ = true;
|
| client_.reset();
|
|
|