Index: mojo/services/html_viewer/websockethandle_impl.cc |
diff --git a/mojo/services/html_viewer/websockethandle_impl.cc b/mojo/services/html_viewer/websockethandle_impl.cc |
index e688060221d3506ed2dbc2c17f9e72eda9087c84..3dcb65f6b9f89693a9d9e9c8672f4299c100ecf1 100644 |
--- a/mojo/services/html_viewer/websockethandle_impl.cc |
+++ b/mojo/services/html_viewer/websockethandle_impl.cc |
@@ -6,7 +6,11 @@ |
#include <vector> |
+#include "base/bind.h" |
+#include "base/memory/scoped_vector.h" |
#include "mojo/services/html_viewer/blink_basic_type_converters.h" |
+#include "mojo/services/public/cpp/network/web_socket_read_queue.h" |
+#include "mojo/services/public/cpp/network/web_socket_write_queue.h" |
#include "mojo/services/public/interfaces/network/network_service.mojom.h" |
#include "third_party/WebKit/public/platform/WebSerializedOrigin.h" |
#include "third_party/WebKit/public/platform/WebSocketHandleClient.h" |
@@ -67,12 +71,15 @@ class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> { |
private: |
// WebSocketClient methods: |
- virtual void DidConnect( |
- bool fail, |
- const String& selected_subprotocol, |
- const String& extensions) OVERRIDE { |
+ virtual void DidConnect(bool fail, |
+ const String& selected_subprotocol, |
+ const String& extensions, |
+ ScopedDataPipeConsumerHandle receive_stream) |
+ OVERRIDE { |
blink::WebSocketHandleClient* client = client_; |
WebSocketHandleImpl* handle = handle_; |
+ receive_stream_ = receive_stream.Pass(); |
+ read_queue_.reset(new WebSocketReadQueue(receive_stream_.get())); |
if (fail) |
handle->Disconnect(); // deletes |this| |
client->didConnect(handle, |
@@ -84,19 +91,11 @@ class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> { |
virtual void DidReceiveData(bool fin, |
WebSocket::MessageType type, |
- ScopedDataPipeConsumerHandle data_pipe) OVERRIDE { |
- uint32_t num_bytes; |
- ReadDataRaw(data_pipe.get(), NULL, &num_bytes, MOJO_READ_DATA_FLAG_QUERY); |
- std::vector<char> data(num_bytes); |
- ReadDataRaw( |
- data_pipe.get(), &data[0], &num_bytes, MOJO_READ_DATA_FLAG_NONE); |
- const char* data_ptr = data.empty() ? NULL : &data[0]; |
- client_->didReceiveData(handle_, |
- fin, |
- ConvertTo<WebSocketHandle::MessageType>(type), |
- data_ptr, |
- data.size()); |
- // |handle| can be deleted here. |
+ uint32_t num_bytes) OVERRIDE { |
+ read_queue_->Read(num_bytes, |
+ base::Bind(&WebSocketClientImpl::DidReadFromReceiveStream, |
+ base::Unretained(this), |
+ fin, type, num_bytes)); |
} |
virtual void DidReceiveFlowControl(int64_t quota) OVERRIDE { |
@@ -122,8 +121,23 @@ class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> { |
// |handle| can be deleted here. |
} |
+ void DidReadFromReceiveStream(bool fin, |
+ WebSocket::MessageType type, |
+ uint32_t num_bytes, |
+ const char* data) { |
+ client_->didReceiveData(handle_, |
+ fin, |
+ ConvertTo<WebSocketHandle::MessageType>(type), |
+ data, |
+ num_bytes); |
+ // |handle_| can be deleted here. |
+ } |
+ |
+ // |handle_| owns this object, so it is guaranteed to outlive us. |
WebSocketHandleImpl* handle_; |
blink::WebSocketHandleClient* client_; |
+ ScopedDataPipeConsumerHandle receive_stream_; |
+ scoped_ptr<WebSocketReadQueue> read_queue_; |
DISALLOW_COPY_AND_ASSIGN(WebSocketClientImpl); |
}; |
@@ -150,9 +164,14 @@ void WebSocketHandleImpl::connect(const WebURL& url, |
// TODO(mpcomplete): Is this the right ownership model? Or should mojo own |
// |client_|? |
WeakBindToProxy(client_.get(), &client_ptr); |
+ |
+ DataPipe data_pipe; |
+ send_stream_ = data_pipe.producer_handle.Pass(); |
+ write_queue_.reset(new WebSocketWriteQueue(send_stream_.get())); |
web_socket_->Connect(url.string().utf8(), |
Array<String>::From(protocols), |
origin.string().utf8(), |
+ data_pipe.consumer_handle.Pass(), |
client_ptr.Pass()); |
} |
@@ -163,22 +182,12 @@ void WebSocketHandleImpl::send(bool fin, |
if (!client_) |
return; |
- // TODO(mpcomplete): reuse the data pipe for subsequent sends. |
- uint32_t num_bytes = static_cast<uint32_t>(size); |
- MojoCreateDataPipeOptions options; |
- options.struct_size = sizeof(MojoCreateDataPipeOptions); |
- options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; |
- options.element_num_bytes = 1; |
- options.capacity_num_bytes = num_bytes; |
- DataPipe data_pipe(options); |
- WriteDataRaw(data_pipe.producer_handle.get(), |
- data, |
- &num_bytes, |
- MOJO_WRITE_DATA_FLAG_ALL_OR_NONE); |
- web_socket_->Send( |
- fin, |
- ConvertTo<WebSocket::MessageType>(type), |
- data_pipe.consumer_handle.Pass()); |
+ uint32_t size32 = static_cast<uint32_t>(size); |
+ write_queue_->Write( |
+ data, size32, |
+ base::Bind(&WebSocketHandleImpl::DidWriteToSendStream, |
+ base::Unretained(this), |
+ fin, type, size32)); |
} |
void WebSocketHandleImpl::flowControl(int64_t quota) { |
@@ -192,6 +201,14 @@ void WebSocketHandleImpl::close(unsigned short code, const WebString& reason) { |
web_socket_->Close(code, reason.utf8()); |
} |
+void WebSocketHandleImpl::DidWriteToSendStream( |
+ bool fin, |
+ WebSocketHandle::MessageType type, |
+ uint32_t num_bytes, |
+ const char* data) { |
+ web_socket_->Send(fin, ConvertTo<WebSocket::MessageType>(type), num_bytes); |
+} |
+ |
void WebSocketHandleImpl::Disconnect() { |
did_close_ = true; |
client_.reset(); |