Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(225)

Side by Side Diff: mojo/services/network/web_socket_impl.cc

Issue 550003005: Mojo: WebSocket interface now reuses the DataPipe for subsequent sends or (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/services/network/web_socket_impl.h" 5 #include "mojo/services/network/web_socket_impl.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "base/message_loop/message_loop.h"
9 #include "mojo/common/handle_watcher.h"
8 #include "mojo/services/network/network_context.h" 10 #include "mojo/services/network/network_context.h"
11 #include "mojo/services/network/web_socket_data_pipe_queue.h"
9 #include "net/websockets/websocket_channel.h" 12 #include "net/websockets/websocket_channel.h"
10 #include "net/websockets/websocket_errors.h" 13 #include "net/websockets/websocket_errors.h"
11 #include "net/websockets/websocket_event_interface.h" 14 #include "net/websockets/websocket_event_interface.h"
12 #include "net/websockets/websocket_frame.h" // for WebSocketFrameHeader::OpCode 15 #include "net/websockets/websocket_frame.h" // for WebSocketFrameHeader::OpCode
13 #include "net/websockets/websocket_handshake_request_info.h" 16 #include "net/websockets/websocket_handshake_request_info.h"
14 #include "net/websockets/websocket_handshake_response_info.h" 17 #include "net/websockets/websocket_handshake_response_info.h"
15 #include "url/origin.h" 18 #include "url/origin.h"
16 19
17 namespace mojo { 20 namespace mojo {
18 21
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
81 virtual ChannelState OnStartOpeningHandshake( 84 virtual ChannelState OnStartOpeningHandshake(
82 scoped_ptr<net::WebSocketHandshakeRequestInfo> request) OVERRIDE; 85 scoped_ptr<net::WebSocketHandshakeRequestInfo> request) OVERRIDE;
83 virtual ChannelState OnFinishOpeningHandshake( 86 virtual ChannelState OnFinishOpeningHandshake(
84 scoped_ptr<net::WebSocketHandshakeResponseInfo> response) OVERRIDE; 87 scoped_ptr<net::WebSocketHandshakeResponseInfo> response) OVERRIDE;
85 virtual ChannelState OnSSLCertificateError( 88 virtual ChannelState OnSSLCertificateError(
86 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks, 89 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks,
87 const GURL& url, 90 const GURL& url,
88 const net::SSLInfo& ssl_info, 91 const net::SSLInfo& ssl_info,
89 bool fatal) OVERRIDE; 92 bool fatal) OVERRIDE;
90 93
94 // Called once we've written to |receive_stream_|.
95 void DidWriteToReceiveStream(bool fin,
96 net::WebSocketFrameHeader::OpCode type,
97 uint32_t num_bytes,
98 const char* buffer);
91 WebSocketClientPtr client_; 99 WebSocketClientPtr client_;
100 ScopedDataPipeProducerHandle receive_stream_;
101 scoped_ptr<WebSocketWriteQueue> write_queue_;
92 102
93 DISALLOW_COPY_AND_ASSIGN(WebSocketEventHandler); 103 DISALLOW_COPY_AND_ASSIGN(WebSocketEventHandler);
94 }; 104 };
95 105
96 ChannelState WebSocketEventHandler::OnAddChannelResponse( 106 ChannelState WebSocketEventHandler::OnAddChannelResponse(
97 bool fail, 107 bool fail,
98 const std::string& selected_protocol, 108 const std::string& selected_protocol,
99 const std::string& extensions) { 109 const std::string& extensions) {
100 client_->DidConnect(fail, selected_protocol, extensions); 110 DataPipe data_pipe;
111 receive_stream_ = data_pipe.producer_handle.Pass();
112 write_queue_.reset(new WebSocketWriteQueue(receive_stream_.get()));
113 client_->DidConnect(
114 fail, selected_protocol, extensions, data_pipe.consumer_handle.Pass());
101 if (fail) 115 if (fail)
102 return WebSocketEventInterface::CHANNEL_DELETED; 116 return WebSocketEventInterface::CHANNEL_DELETED;
103 return WebSocketEventInterface::CHANNEL_ALIVE; 117 return WebSocketEventInterface::CHANNEL_ALIVE;
104 } 118 }
105 119
106 ChannelState WebSocketEventHandler::OnDataFrame( 120 ChannelState WebSocketEventHandler::OnDataFrame(
107 bool fin, 121 bool fin,
108 net::WebSocketFrameHeader::OpCode type, 122 net::WebSocketFrameHeader::OpCode type,
109 const std::vector<char>& data) { 123 const std::vector<char>& data) {
110 // TODO(mpcomplete): reuse the data pipe for subsequent frames. 124 write_queue_->Write(
111 uint32_t num_bytes = static_cast<uint32_t>(data.size()); 125 &data[0], data.size(),
112 MojoCreateDataPipeOptions options; 126 base::Bind(&WebSocketEventHandler::DidWriteToReceiveStream,
113 options.struct_size = sizeof(MojoCreateDataPipeOptions); 127 base::Unretained(this),
114 options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; 128 fin, type, data.size()));
115 options.element_num_bytes = 1;
116 options.capacity_num_bytes = num_bytes;
117 DataPipe data_pipe(options);
118 WriteDataRaw(data_pipe.producer_handle.get(),
119 &data[0],
120 &num_bytes,
121 MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
122 client_->DidReceiveData(fin, ConvertTo<WebSocket::MessageType>(type),
123 data_pipe.consumer_handle.Pass());
124 return WebSocketEventInterface::CHANNEL_ALIVE; 129 return WebSocketEventInterface::CHANNEL_ALIVE;
125 } 130 }
126 131
127 ChannelState WebSocketEventHandler::OnClosingHandshake() { 132 ChannelState WebSocketEventHandler::OnClosingHandshake() {
128 return WebSocketEventInterface::CHANNEL_ALIVE; 133 return WebSocketEventInterface::CHANNEL_ALIVE;
129 } 134 }
130 135
131 ChannelState WebSocketEventHandler::OnFlowControl(int64 quota) { 136 ChannelState WebSocketEventHandler::OnFlowControl(int64 quota) {
132 client_->DidReceiveFlowControl(quota); 137 client_->DidReceiveFlowControl(quota);
133 return WebSocketEventInterface::CHANNEL_ALIVE; 138 return WebSocketEventInterface::CHANNEL_ALIVE;
(...skipping 23 matching lines...) Expand all
157 162
158 ChannelState WebSocketEventHandler::OnSSLCertificateError( 163 ChannelState WebSocketEventHandler::OnSSLCertificateError(
159 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks, 164 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks,
160 const GURL& url, 165 const GURL& url,
161 const net::SSLInfo& ssl_info, 166 const net::SSLInfo& ssl_info,
162 bool fatal) { 167 bool fatal) {
163 client_->DidFail("SSL Error"); 168 client_->DidFail("SSL Error");
164 return WebSocketEventInterface::CHANNEL_DELETED; 169 return WebSocketEventInterface::CHANNEL_DELETED;
165 } 170 }
166 171
172 void WebSocketEventHandler::DidWriteToReceiveStream(
173 bool fin,
174 net::WebSocketFrameHeader::OpCode type,
175 uint32_t num_bytes,
176 const char* buffer) {
177 client_->DidReceiveData(
178 fin, ConvertTo<WebSocket::MessageType>(type), num_bytes);
179 }
180
167 } // namespace mojo 181 } // namespace mojo
168 182
169 WebSocketImpl::WebSocketImpl(NetworkContext* context) : context_(context) { 183 WebSocketImpl::WebSocketImpl(NetworkContext* context) : context_(context) {
170 } 184 }
171 185
172 WebSocketImpl::~WebSocketImpl() { 186 WebSocketImpl::~WebSocketImpl() {
173 } 187 }
174 188
175 void WebSocketImpl::Connect(const String& url, 189 void WebSocketImpl::Connect(const String& url,
176 Array<String> protocols, 190 Array<String> protocols,
177 const String& origin, 191 const String& origin,
192 ScopedDataPipeConsumerHandle send_stream,
178 WebSocketClientPtr client) { 193 WebSocketClientPtr client) {
179 DCHECK(!channel_); 194 DCHECK(!channel_);
195 send_stream_ = send_stream.Pass();
196 read_queue_.reset(new WebSocketReadQueue(send_stream_.get()));
180 scoped_ptr<net::WebSocketEventInterface> event_interface( 197 scoped_ptr<net::WebSocketEventInterface> event_interface(
181 new WebSocketEventHandler(client.Pass())); 198 new WebSocketEventHandler(client.Pass()));
182 channel_.reset(new net::WebSocketChannel(event_interface.Pass(), 199 channel_.reset(new net::WebSocketChannel(event_interface.Pass(),
183 context_->url_request_context())); 200 context_->url_request_context()));
184 channel_->SendAddChannelRequest(GURL(url.get()), 201 channel_->SendAddChannelRequest(GURL(url.get()),
185 protocols.To<std::vector<std::string> >(), 202 protocols.To<std::vector<std::string> >(),
186 url::Origin(origin.get())); 203 url::Origin(origin.get()));
187 } 204 }
188 205
189 void WebSocketImpl::Send(bool fin, 206 void WebSocketImpl::Send(bool fin,
190 WebSocket::MessageType type, 207 WebSocket::MessageType type,
191 ScopedDataPipeConsumerHandle data_pipe) { 208 uint32_t num_bytes) {
192 DCHECK(channel_); 209 DCHECK(channel_);
193 uint32_t num_bytes; 210 read_queue_->Read(num_bytes,
194 ReadDataRaw(data_pipe.get(), NULL, &num_bytes, MOJO_READ_DATA_FLAG_QUERY); 211 base::Bind(&WebSocketImpl::DidReadFromSendStream,
195 std::vector<char> data(num_bytes); 212 base::Unretained(this),
196 ReadDataRaw(data_pipe.get(), &data[0], &num_bytes, MOJO_READ_DATA_FLAG_NONE); 213 fin, type, num_bytes));
197 channel_->SendFrame(
198 fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), data);
199 } 214 }
200 215
201 void WebSocketImpl::FlowControl(int64_t quota) { 216 void WebSocketImpl::FlowControl(int64_t quota) {
202 DCHECK(channel_); 217 DCHECK(channel_);
203 channel_->SendFlowControl(quota); 218 channel_->SendFlowControl(quota);
204 } 219 }
205 220
206 void WebSocketImpl::Close(uint16_t code, const String& reason) { 221 void WebSocketImpl::Close(uint16_t code, const String& reason) {
207 DCHECK(channel_); 222 DCHECK(channel_);
208 channel_->StartClosingHandshake(code, reason); 223 channel_->StartClosingHandshake(code, reason);
209 } 224 }
210 225
226 void WebSocketImpl::DidReadFromSendStream(bool fin,
227 WebSocket::MessageType type,
228 uint32_t num_bytes,
229 const char* data) {
230 std::vector<char> buffer(num_bytes);
231 memcpy(&buffer[0], data, num_bytes);
232 DCHECK(channel_);
233 channel_->SendFrame(
234 fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), buffer);
235 }
236
211 } // namespace mojo 237 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698