OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/services/network/web_socket_impl.h" | 5 #include "mojo/services/network/web_socket_impl.h" |
6 | 6 |
7 #include "base/logging.h" | 7 #include "base/logging.h" |
| 8 #include "base/message_loop/message_loop.h" |
| 9 #include "mojo/common/handle_watcher.h" |
8 #include "mojo/services/network/network_context.h" | 10 #include "mojo/services/network/network_context.h" |
| 11 #include "mojo/services/public/cpp/network/web_socket_read_queue.h" |
| 12 #include "mojo/services/public/cpp/network/web_socket_write_queue.h" |
9 #include "net/websockets/websocket_channel.h" | 13 #include "net/websockets/websocket_channel.h" |
10 #include "net/websockets/websocket_errors.h" | 14 #include "net/websockets/websocket_errors.h" |
11 #include "net/websockets/websocket_event_interface.h" | 15 #include "net/websockets/websocket_event_interface.h" |
12 #include "net/websockets/websocket_frame.h" // for WebSocketFrameHeader::OpCode | 16 #include "net/websockets/websocket_frame.h" // for WebSocketFrameHeader::OpCode |
13 #include "net/websockets/websocket_handshake_request_info.h" | 17 #include "net/websockets/websocket_handshake_request_info.h" |
14 #include "net/websockets/websocket_handshake_response_info.h" | 18 #include "net/websockets/websocket_handshake_response_info.h" |
15 #include "url/origin.h" | 19 #include "url/origin.h" |
16 | 20 |
17 namespace mojo { | 21 namespace mojo { |
18 | 22 |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
81 virtual ChannelState OnStartOpeningHandshake( | 85 virtual ChannelState OnStartOpeningHandshake( |
82 scoped_ptr<net::WebSocketHandshakeRequestInfo> request) OVERRIDE; | 86 scoped_ptr<net::WebSocketHandshakeRequestInfo> request) OVERRIDE; |
83 virtual ChannelState OnFinishOpeningHandshake( | 87 virtual ChannelState OnFinishOpeningHandshake( |
84 scoped_ptr<net::WebSocketHandshakeResponseInfo> response) OVERRIDE; | 88 scoped_ptr<net::WebSocketHandshakeResponseInfo> response) OVERRIDE; |
85 virtual ChannelState OnSSLCertificateError( | 89 virtual ChannelState OnSSLCertificateError( |
86 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks, | 90 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks, |
87 const GURL& url, | 91 const GURL& url, |
88 const net::SSLInfo& ssl_info, | 92 const net::SSLInfo& ssl_info, |
89 bool fatal) OVERRIDE; | 93 bool fatal) OVERRIDE; |
90 | 94 |
| 95 // Called once we've written to |receive_stream_|. |
| 96 void DidWriteToReceiveStream(bool fin, |
| 97 net::WebSocketFrameHeader::OpCode type, |
| 98 uint32_t num_bytes, |
| 99 const char* buffer); |
91 WebSocketClientPtr client_; | 100 WebSocketClientPtr client_; |
| 101 ScopedDataPipeProducerHandle receive_stream_; |
| 102 scoped_ptr<WebSocketWriteQueue> write_queue_; |
92 | 103 |
93 DISALLOW_COPY_AND_ASSIGN(WebSocketEventHandler); | 104 DISALLOW_COPY_AND_ASSIGN(WebSocketEventHandler); |
94 }; | 105 }; |
95 | 106 |
96 ChannelState WebSocketEventHandler::OnAddChannelResponse( | 107 ChannelState WebSocketEventHandler::OnAddChannelResponse( |
97 bool fail, | 108 bool fail, |
98 const std::string& selected_protocol, | 109 const std::string& selected_protocol, |
99 const std::string& extensions) { | 110 const std::string& extensions) { |
100 client_->DidConnect(fail, selected_protocol, extensions); | 111 DataPipe data_pipe; |
| 112 receive_stream_ = data_pipe.producer_handle.Pass(); |
| 113 write_queue_.reset(new WebSocketWriteQueue(receive_stream_.get())); |
| 114 client_->DidConnect( |
| 115 fail, selected_protocol, extensions, data_pipe.consumer_handle.Pass()); |
101 if (fail) | 116 if (fail) |
102 return WebSocketEventInterface::CHANNEL_DELETED; | 117 return WebSocketEventInterface::CHANNEL_DELETED; |
103 return WebSocketEventInterface::CHANNEL_ALIVE; | 118 return WebSocketEventInterface::CHANNEL_ALIVE; |
104 } | 119 } |
105 | 120 |
106 ChannelState WebSocketEventHandler::OnDataFrame( | 121 ChannelState WebSocketEventHandler::OnDataFrame( |
107 bool fin, | 122 bool fin, |
108 net::WebSocketFrameHeader::OpCode type, | 123 net::WebSocketFrameHeader::OpCode type, |
109 const std::vector<char>& data) { | 124 const std::vector<char>& data) { |
110 // TODO(mpcomplete): reuse the data pipe for subsequent frames. | 125 uint32_t size = static_cast<uint32_t>(data.size()); |
111 uint32_t num_bytes = static_cast<uint32_t>(data.size()); | 126 write_queue_->Write( |
112 MojoCreateDataPipeOptions options; | 127 &data[0], size, |
113 options.struct_size = sizeof(MojoCreateDataPipeOptions); | 128 base::Bind(&WebSocketEventHandler::DidWriteToReceiveStream, |
114 options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; | 129 base::Unretained(this), |
115 options.element_num_bytes = 1; | 130 fin, type, size)); |
116 options.capacity_num_bytes = num_bytes; | |
117 DataPipe data_pipe(options); | |
118 WriteDataRaw(data_pipe.producer_handle.get(), | |
119 &data[0], | |
120 &num_bytes, | |
121 MOJO_WRITE_DATA_FLAG_ALL_OR_NONE); | |
122 client_->DidReceiveData(fin, ConvertTo<WebSocket::MessageType>(type), | |
123 data_pipe.consumer_handle.Pass()); | |
124 return WebSocketEventInterface::CHANNEL_ALIVE; | 131 return WebSocketEventInterface::CHANNEL_ALIVE; |
125 } | 132 } |
126 | 133 |
127 ChannelState WebSocketEventHandler::OnClosingHandshake() { | 134 ChannelState WebSocketEventHandler::OnClosingHandshake() { |
128 return WebSocketEventInterface::CHANNEL_ALIVE; | 135 return WebSocketEventInterface::CHANNEL_ALIVE; |
129 } | 136 } |
130 | 137 |
131 ChannelState WebSocketEventHandler::OnFlowControl(int64 quota) { | 138 ChannelState WebSocketEventHandler::OnFlowControl(int64 quota) { |
132 client_->DidReceiveFlowControl(quota); | 139 client_->DidReceiveFlowControl(quota); |
133 return WebSocketEventInterface::CHANNEL_ALIVE; | 140 return WebSocketEventInterface::CHANNEL_ALIVE; |
(...skipping 23 matching lines...) Expand all Loading... |
157 | 164 |
158 ChannelState WebSocketEventHandler::OnSSLCertificateError( | 165 ChannelState WebSocketEventHandler::OnSSLCertificateError( |
159 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks, | 166 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks, |
160 const GURL& url, | 167 const GURL& url, |
161 const net::SSLInfo& ssl_info, | 168 const net::SSLInfo& ssl_info, |
162 bool fatal) { | 169 bool fatal) { |
163 client_->DidFail("SSL Error"); | 170 client_->DidFail("SSL Error"); |
164 return WebSocketEventInterface::CHANNEL_DELETED; | 171 return WebSocketEventInterface::CHANNEL_DELETED; |
165 } | 172 } |
166 | 173 |
| 174 void WebSocketEventHandler::DidWriteToReceiveStream( |
| 175 bool fin, |
| 176 net::WebSocketFrameHeader::OpCode type, |
| 177 uint32_t num_bytes, |
| 178 const char* buffer) { |
| 179 client_->DidReceiveData( |
| 180 fin, ConvertTo<WebSocket::MessageType>(type), num_bytes); |
| 181 } |
| 182 |
167 } // namespace mojo | 183 } // namespace mojo |
168 | 184 |
169 WebSocketImpl::WebSocketImpl(NetworkContext* context) : context_(context) { | 185 WebSocketImpl::WebSocketImpl(NetworkContext* context) : context_(context) { |
170 } | 186 } |
171 | 187 |
172 WebSocketImpl::~WebSocketImpl() { | 188 WebSocketImpl::~WebSocketImpl() { |
173 } | 189 } |
174 | 190 |
175 void WebSocketImpl::Connect(const String& url, | 191 void WebSocketImpl::Connect(const String& url, |
176 Array<String> protocols, | 192 Array<String> protocols, |
177 const String& origin, | 193 const String& origin, |
| 194 ScopedDataPipeConsumerHandle send_stream, |
178 WebSocketClientPtr client) { | 195 WebSocketClientPtr client) { |
179 DCHECK(!channel_); | 196 DCHECK(!channel_); |
| 197 send_stream_ = send_stream.Pass(); |
| 198 read_queue_.reset(new WebSocketReadQueue(send_stream_.get())); |
180 scoped_ptr<net::WebSocketEventInterface> event_interface( | 199 scoped_ptr<net::WebSocketEventInterface> event_interface( |
181 new WebSocketEventHandler(client.Pass())); | 200 new WebSocketEventHandler(client.Pass())); |
182 channel_.reset(new net::WebSocketChannel(event_interface.Pass(), | 201 channel_.reset(new net::WebSocketChannel(event_interface.Pass(), |
183 context_->url_request_context())); | 202 context_->url_request_context())); |
184 channel_->SendAddChannelRequest(GURL(url.get()), | 203 channel_->SendAddChannelRequest(GURL(url.get()), |
185 protocols.To<std::vector<std::string> >(), | 204 protocols.To<std::vector<std::string> >(), |
186 url::Origin(origin.get())); | 205 url::Origin(origin.get())); |
187 } | 206 } |
188 | 207 |
189 void WebSocketImpl::Send(bool fin, | 208 void WebSocketImpl::Send(bool fin, |
190 WebSocket::MessageType type, | 209 WebSocket::MessageType type, |
191 ScopedDataPipeConsumerHandle data_pipe) { | 210 uint32_t num_bytes) { |
192 DCHECK(channel_); | 211 DCHECK(channel_); |
193 uint32_t num_bytes; | 212 read_queue_->Read(num_bytes, |
194 ReadDataRaw(data_pipe.get(), NULL, &num_bytes, MOJO_READ_DATA_FLAG_QUERY); | 213 base::Bind(&WebSocketImpl::DidReadFromSendStream, |
195 std::vector<char> data(num_bytes); | 214 base::Unretained(this), |
196 ReadDataRaw(data_pipe.get(), &data[0], &num_bytes, MOJO_READ_DATA_FLAG_NONE); | 215 fin, type, num_bytes)); |
197 channel_->SendFrame( | |
198 fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), data); | |
199 } | 216 } |
200 | 217 |
201 void WebSocketImpl::FlowControl(int64_t quota) { | 218 void WebSocketImpl::FlowControl(int64_t quota) { |
202 DCHECK(channel_); | 219 DCHECK(channel_); |
203 channel_->SendFlowControl(quota); | 220 channel_->SendFlowControl(quota); |
204 } | 221 } |
205 | 222 |
206 void WebSocketImpl::Close(uint16_t code, const String& reason) { | 223 void WebSocketImpl::Close(uint16_t code, const String& reason) { |
207 DCHECK(channel_); | 224 DCHECK(channel_); |
208 channel_->StartClosingHandshake(code, reason); | 225 channel_->StartClosingHandshake(code, reason); |
209 } | 226 } |
210 | 227 |
| 228 void WebSocketImpl::DidReadFromSendStream(bool fin, |
| 229 WebSocket::MessageType type, |
| 230 uint32_t num_bytes, |
| 231 const char* data) { |
| 232 std::vector<char> buffer(num_bytes); |
| 233 memcpy(&buffer[0], data, num_bytes); |
| 234 DCHECK(channel_); |
| 235 channel_->SendFrame( |
| 236 fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), buffer); |
| 237 } |
| 238 |
211 } // namespace mojo | 239 } // namespace mojo |
OLD | NEW |