Index: mojo/services/network/web_socket_impl.cc |
diff --git a/mojo/services/network/web_socket_impl.cc b/mojo/services/network/web_socket_impl.cc |
index f65249686584486a85f72fdb59b5fb97710c8646..7ba9b22da9657457dc87812e099919a95bab50dc 100644 |
--- a/mojo/services/network/web_socket_impl.cc |
+++ b/mojo/services/network/web_socket_impl.cc |
@@ -5,7 +5,11 @@ |
#include "mojo/services/network/web_socket_impl.h" |
#include "base/logging.h" |
+#include "base/message_loop/message_loop.h" |
+#include "mojo/common/handle_watcher.h" |
#include "mojo/services/network/network_context.h" |
+#include "mojo/services/public/cpp/network/web_socket_read_queue.h" |
+#include "mojo/services/public/cpp/network/web_socket_write_queue.h" |
#include "net/websockets/websocket_channel.h" |
#include "net/websockets/websocket_errors.h" |
#include "net/websockets/websocket_event_interface.h" |
@@ -88,7 +92,14 @@ struct WebSocketEventHandler : public net::WebSocketEventInterface { |
const net::SSLInfo& ssl_info, |
bool fatal) OVERRIDE; |
+ // Called once we've written to |receive_stream_|. |
+ void DidWriteToReceiveStream(bool fin, |
+ net::WebSocketFrameHeader::OpCode type, |
+ uint32_t num_bytes, |
+ const char* buffer); |
WebSocketClientPtr client_; |
+ ScopedDataPipeProducerHandle receive_stream_; |
+ scoped_ptr<WebSocketWriteQueue> write_queue_; |
DISALLOW_COPY_AND_ASSIGN(WebSocketEventHandler); |
}; |
@@ -97,7 +108,11 @@ ChannelState WebSocketEventHandler::OnAddChannelResponse( |
bool fail, |
const std::string& selected_protocol, |
const std::string& extensions) { |
- client_->DidConnect(fail, selected_protocol, extensions); |
+ DataPipe data_pipe; |
+ receive_stream_ = data_pipe.producer_handle.Pass(); |
+ write_queue_.reset(new WebSocketWriteQueue(receive_stream_.get())); |
+ client_->DidConnect( |
+ fail, selected_protocol, extensions, data_pipe.consumer_handle.Pass()); |
if (fail) |
return WebSocketEventInterface::CHANNEL_DELETED; |
return WebSocketEventInterface::CHANNEL_ALIVE; |
@@ -107,20 +122,12 @@ ChannelState WebSocketEventHandler::OnDataFrame( |
bool fin, |
net::WebSocketFrameHeader::OpCode type, |
const std::vector<char>& data) { |
- // TODO(mpcomplete): reuse the data pipe for subsequent frames. |
- uint32_t num_bytes = static_cast<uint32_t>(data.size()); |
- MojoCreateDataPipeOptions options; |
- options.struct_size = sizeof(MojoCreateDataPipeOptions); |
- options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; |
- options.element_num_bytes = 1; |
- options.capacity_num_bytes = num_bytes; |
- DataPipe data_pipe(options); |
- WriteDataRaw(data_pipe.producer_handle.get(), |
- &data[0], |
- &num_bytes, |
- MOJO_WRITE_DATA_FLAG_ALL_OR_NONE); |
- client_->DidReceiveData(fin, ConvertTo<WebSocket::MessageType>(type), |
- data_pipe.consumer_handle.Pass()); |
+ uint32_t size = static_cast<uint32_t>(data.size()); |
+ write_queue_->Write( |
+ &data[0], size, |
+ base::Bind(&WebSocketEventHandler::DidWriteToReceiveStream, |
+ base::Unretained(this), |
+ fin, type, size)); |
return WebSocketEventInterface::CHANNEL_ALIVE; |
} |
@@ -164,6 +171,15 @@ ChannelState WebSocketEventHandler::OnSSLCertificateError( |
return WebSocketEventInterface::CHANNEL_DELETED; |
} |
+void WebSocketEventHandler::DidWriteToReceiveStream( |
+ bool fin, |
+ net::WebSocketFrameHeader::OpCode type, |
+ uint32_t num_bytes, |
+ const char* buffer) { |
+ client_->DidReceiveData( |
+ fin, ConvertTo<WebSocket::MessageType>(type), num_bytes); |
+} |
+ |
} // namespace mojo |
WebSocketImpl::WebSocketImpl(NetworkContext* context) : context_(context) { |
@@ -175,8 +191,11 @@ WebSocketImpl::~WebSocketImpl() { |
void WebSocketImpl::Connect(const String& url, |
Array<String> protocols, |
const String& origin, |
+ ScopedDataPipeConsumerHandle send_stream, |
WebSocketClientPtr client) { |
DCHECK(!channel_); |
+ send_stream_ = send_stream.Pass(); |
+ read_queue_.reset(new WebSocketReadQueue(send_stream_.get())); |
scoped_ptr<net::WebSocketEventInterface> event_interface( |
new WebSocketEventHandler(client.Pass())); |
channel_.reset(new net::WebSocketChannel(event_interface.Pass(), |
@@ -188,14 +207,12 @@ void WebSocketImpl::Connect(const String& url, |
void WebSocketImpl::Send(bool fin, |
WebSocket::MessageType type, |
- ScopedDataPipeConsumerHandle data_pipe) { |
+ uint32_t num_bytes) { |
DCHECK(channel_); |
- uint32_t num_bytes; |
- ReadDataRaw(data_pipe.get(), NULL, &num_bytes, MOJO_READ_DATA_FLAG_QUERY); |
- std::vector<char> data(num_bytes); |
- ReadDataRaw(data_pipe.get(), &data[0], &num_bytes, MOJO_READ_DATA_FLAG_NONE); |
- channel_->SendFrame( |
- fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), data); |
+ read_queue_->Read(num_bytes, |
+ base::Bind(&WebSocketImpl::DidReadFromSendStream, |
+ base::Unretained(this), |
+ fin, type, num_bytes)); |
} |
void WebSocketImpl::FlowControl(int64_t quota) { |
@@ -208,4 +225,15 @@ void WebSocketImpl::Close(uint16_t code, const String& reason) { |
channel_->StartClosingHandshake(code, reason); |
} |
+void WebSocketImpl::DidReadFromSendStream(bool fin, |
+ WebSocket::MessageType type, |
+ uint32_t num_bytes, |
+ const char* data) { |
+ std::vector<char> buffer(num_bytes); |
+ memcpy(&buffer[0], data, num_bytes); |
+ DCHECK(channel_); |
+ channel_->SendFrame( |
+ fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), buffer); |
+} |
+ |
} // namespace mojo |