Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(236)

Side by Side Diff: mojo/services/network/web_socket_impl.cc

Issue 550003005: Mojo: WebSocket interface now reuses the DataPipe for subsequent sends or (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: give up Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/services/network/web_socket_impl.h" 5 #include "mojo/services/network/web_socket_impl.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "base/message_loop/message_loop.h"
9 #include "mojo/common/handle_watcher.h"
8 #include "mojo/services/network/network_context.h" 10 #include "mojo/services/network/network_context.h"
11 #include "mojo/services/public/cpp/network/web_socket_read_queue.h"
12 #include "mojo/services/public/cpp/network/web_socket_write_queue.h"
9 #include "net/websockets/websocket_channel.h" 13 #include "net/websockets/websocket_channel.h"
10 #include "net/websockets/websocket_errors.h" 14 #include "net/websockets/websocket_errors.h"
11 #include "net/websockets/websocket_event_interface.h" 15 #include "net/websockets/websocket_event_interface.h"
12 #include "net/websockets/websocket_frame.h" // for WebSocketFrameHeader::OpCode 16 #include "net/websockets/websocket_frame.h" // for WebSocketFrameHeader::OpCode
13 #include "net/websockets/websocket_handshake_request_info.h" 17 #include "net/websockets/websocket_handshake_request_info.h"
14 #include "net/websockets/websocket_handshake_response_info.h" 18 #include "net/websockets/websocket_handshake_response_info.h"
15 #include "url/origin.h" 19 #include "url/origin.h"
16 20
17 namespace mojo { 21 namespace mojo {
18 22
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
81 virtual ChannelState OnStartOpeningHandshake( 85 virtual ChannelState OnStartOpeningHandshake(
82 scoped_ptr<net::WebSocketHandshakeRequestInfo> request) OVERRIDE; 86 scoped_ptr<net::WebSocketHandshakeRequestInfo> request) OVERRIDE;
83 virtual ChannelState OnFinishOpeningHandshake( 87 virtual ChannelState OnFinishOpeningHandshake(
84 scoped_ptr<net::WebSocketHandshakeResponseInfo> response) OVERRIDE; 88 scoped_ptr<net::WebSocketHandshakeResponseInfo> response) OVERRIDE;
85 virtual ChannelState OnSSLCertificateError( 89 virtual ChannelState OnSSLCertificateError(
86 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks, 90 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks,
87 const GURL& url, 91 const GURL& url,
88 const net::SSLInfo& ssl_info, 92 const net::SSLInfo& ssl_info,
89 bool fatal) OVERRIDE; 93 bool fatal) OVERRIDE;
90 94
95 // Called once we've written to |receive_stream_|.
96 void DidWriteToReceiveStream(bool fin,
97 net::WebSocketFrameHeader::OpCode type,
98 uint32_t num_bytes,
99 const char* buffer);
91 WebSocketClientPtr client_; 100 WebSocketClientPtr client_;
101 ScopedDataPipeProducerHandle receive_stream_;
102 scoped_ptr<WebSocketWriteQueue> write_queue_;
92 103
93 DISALLOW_COPY_AND_ASSIGN(WebSocketEventHandler); 104 DISALLOW_COPY_AND_ASSIGN(WebSocketEventHandler);
94 }; 105 };
95 106
96 ChannelState WebSocketEventHandler::OnAddChannelResponse( 107 ChannelState WebSocketEventHandler::OnAddChannelResponse(
97 bool fail, 108 bool fail,
98 const std::string& selected_protocol, 109 const std::string& selected_protocol,
99 const std::string& extensions) { 110 const std::string& extensions) {
100 client_->DidConnect(fail, selected_protocol, extensions); 111 DataPipe data_pipe;
112 receive_stream_ = data_pipe.producer_handle.Pass();
113 write_queue_.reset(new WebSocketWriteQueue(receive_stream_.get()));
114 client_->DidConnect(
115 fail, selected_protocol, extensions, data_pipe.consumer_handle.Pass());
101 if (fail) 116 if (fail)
102 return WebSocketEventInterface::CHANNEL_DELETED; 117 return WebSocketEventInterface::CHANNEL_DELETED;
103 return WebSocketEventInterface::CHANNEL_ALIVE; 118 return WebSocketEventInterface::CHANNEL_ALIVE;
104 } 119 }
105 120
106 ChannelState WebSocketEventHandler::OnDataFrame( 121 ChannelState WebSocketEventHandler::OnDataFrame(
107 bool fin, 122 bool fin,
108 net::WebSocketFrameHeader::OpCode type, 123 net::WebSocketFrameHeader::OpCode type,
109 const std::vector<char>& data) { 124 const std::vector<char>& data) {
110 // TODO(mpcomplete): reuse the data pipe for subsequent frames. 125 uint32_t size = static_cast<uint32_t>(data.size());
111 uint32_t num_bytes = static_cast<uint32_t>(data.size()); 126 write_queue_->Write(
112 MojoCreateDataPipeOptions options; 127 &data[0], size,
113 options.struct_size = sizeof(MojoCreateDataPipeOptions); 128 base::Bind(&WebSocketEventHandler::DidWriteToReceiveStream,
114 options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; 129 base::Unretained(this),
115 options.element_num_bytes = 1; 130 fin, type, size));
116 options.capacity_num_bytes = num_bytes;
117 DataPipe data_pipe(options);
118 WriteDataRaw(data_pipe.producer_handle.get(),
119 &data[0],
120 &num_bytes,
121 MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
122 client_->DidReceiveData(fin, ConvertTo<WebSocket::MessageType>(type),
123 data_pipe.consumer_handle.Pass());
124 return WebSocketEventInterface::CHANNEL_ALIVE; 131 return WebSocketEventInterface::CHANNEL_ALIVE;
125 } 132 }
126 133
127 ChannelState WebSocketEventHandler::OnClosingHandshake() { 134 ChannelState WebSocketEventHandler::OnClosingHandshake() {
128 return WebSocketEventInterface::CHANNEL_ALIVE; 135 return WebSocketEventInterface::CHANNEL_ALIVE;
129 } 136 }
130 137
131 ChannelState WebSocketEventHandler::OnFlowControl(int64 quota) { 138 ChannelState WebSocketEventHandler::OnFlowControl(int64 quota) {
132 client_->DidReceiveFlowControl(quota); 139 client_->DidReceiveFlowControl(quota);
133 return WebSocketEventInterface::CHANNEL_ALIVE; 140 return WebSocketEventInterface::CHANNEL_ALIVE;
(...skipping 23 matching lines...) Expand all
157 164
158 ChannelState WebSocketEventHandler::OnSSLCertificateError( 165 ChannelState WebSocketEventHandler::OnSSLCertificateError(
159 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks, 166 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks,
160 const GURL& url, 167 const GURL& url,
161 const net::SSLInfo& ssl_info, 168 const net::SSLInfo& ssl_info,
162 bool fatal) { 169 bool fatal) {
163 client_->DidFail("SSL Error"); 170 client_->DidFail("SSL Error");
164 return WebSocketEventInterface::CHANNEL_DELETED; 171 return WebSocketEventInterface::CHANNEL_DELETED;
165 } 172 }
166 173
174 void WebSocketEventHandler::DidWriteToReceiveStream(
175 bool fin,
176 net::WebSocketFrameHeader::OpCode type,
177 uint32_t num_bytes,
178 const char* buffer) {
179 client_->DidReceiveData(
180 fin, ConvertTo<WebSocket::MessageType>(type), num_bytes);
181 }
182
167 } // namespace mojo 183 } // namespace mojo
168 184
169 WebSocketImpl::WebSocketImpl(NetworkContext* context) : context_(context) { 185 WebSocketImpl::WebSocketImpl(NetworkContext* context) : context_(context) {
170 } 186 }
171 187
172 WebSocketImpl::~WebSocketImpl() { 188 WebSocketImpl::~WebSocketImpl() {
173 } 189 }
174 190
175 void WebSocketImpl::Connect(const String& url, 191 void WebSocketImpl::Connect(const String& url,
176 Array<String> protocols, 192 Array<String> protocols,
177 const String& origin, 193 const String& origin,
194 ScopedDataPipeConsumerHandle send_stream,
178 WebSocketClientPtr client) { 195 WebSocketClientPtr client) {
179 DCHECK(!channel_); 196 DCHECK(!channel_);
197 send_stream_ = send_stream.Pass();
198 read_queue_.reset(new WebSocketReadQueue(send_stream_.get()));
180 scoped_ptr<net::WebSocketEventInterface> event_interface( 199 scoped_ptr<net::WebSocketEventInterface> event_interface(
181 new WebSocketEventHandler(client.Pass())); 200 new WebSocketEventHandler(client.Pass()));
182 channel_.reset(new net::WebSocketChannel(event_interface.Pass(), 201 channel_.reset(new net::WebSocketChannel(event_interface.Pass(),
183 context_->url_request_context())); 202 context_->url_request_context()));
184 channel_->SendAddChannelRequest(GURL(url.get()), 203 channel_->SendAddChannelRequest(GURL(url.get()),
185 protocols.To<std::vector<std::string> >(), 204 protocols.To<std::vector<std::string> >(),
186 url::Origin(origin.get())); 205 url::Origin(origin.get()));
187 } 206 }
188 207
189 void WebSocketImpl::Send(bool fin, 208 void WebSocketImpl::Send(bool fin,
190 WebSocket::MessageType type, 209 WebSocket::MessageType type,
191 ScopedDataPipeConsumerHandle data_pipe) { 210 uint32_t num_bytes) {
192 DCHECK(channel_); 211 DCHECK(channel_);
193 uint32_t num_bytes; 212 read_queue_->Read(num_bytes,
194 ReadDataRaw(data_pipe.get(), NULL, &num_bytes, MOJO_READ_DATA_FLAG_QUERY); 213 base::Bind(&WebSocketImpl::DidReadFromSendStream,
195 std::vector<char> data(num_bytes); 214 base::Unretained(this),
196 ReadDataRaw(data_pipe.get(), &data[0], &num_bytes, MOJO_READ_DATA_FLAG_NONE); 215 fin, type, num_bytes));
197 channel_->SendFrame(
198 fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), data);
199 } 216 }
200 217
201 void WebSocketImpl::FlowControl(int64_t quota) { 218 void WebSocketImpl::FlowControl(int64_t quota) {
202 DCHECK(channel_); 219 DCHECK(channel_);
203 channel_->SendFlowControl(quota); 220 channel_->SendFlowControl(quota);
204 } 221 }
205 222
206 void WebSocketImpl::Close(uint16_t code, const String& reason) { 223 void WebSocketImpl::Close(uint16_t code, const String& reason) {
207 DCHECK(channel_); 224 DCHECK(channel_);
208 channel_->StartClosingHandshake(code, reason); 225 channel_->StartClosingHandshake(code, reason);
209 } 226 }
210 227
228 void WebSocketImpl::DidReadFromSendStream(bool fin,
229 WebSocket::MessageType type,
230 uint32_t num_bytes,
231 const char* data) {
232 std::vector<char> buffer(num_bytes);
233 memcpy(&buffer[0], data, num_bytes);
234 DCHECK(channel_);
235 channel_->SendFrame(
236 fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), buffer);
237 }
238
211 } // namespace mojo 239 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698