OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/services/network/web_socket_impl.h" | 5 #include "mojo/services/network/web_socket_impl.h" |
6 | 6 |
7 #include "base/logging.h" | 7 #include "base/logging.h" |
| 8 #include "base/message_loop/message_loop.h" |
| 9 #include "mojo/common/handle_watcher.h" |
8 #include "mojo/services/network/network_context.h" | 10 #include "mojo/services/network/network_context.h" |
| 11 #include "mojo/services/network/web_socket_data_pipe_queue.h" |
9 #include "net/websockets/websocket_channel.h" | 12 #include "net/websockets/websocket_channel.h" |
10 #include "net/websockets/websocket_errors.h" | 13 #include "net/websockets/websocket_errors.h" |
11 #include "net/websockets/websocket_event_interface.h" | 14 #include "net/websockets/websocket_event_interface.h" |
12 #include "net/websockets/websocket_frame.h" // for WebSocketFrameHeader::OpCode | 15 #include "net/websockets/websocket_frame.h" // for WebSocketFrameHeader::OpCode |
13 #include "net/websockets/websocket_handshake_request_info.h" | 16 #include "net/websockets/websocket_handshake_request_info.h" |
14 #include "net/websockets/websocket_handshake_response_info.h" | 17 #include "net/websockets/websocket_handshake_response_info.h" |
15 #include "url/origin.h" | 18 #include "url/origin.h" |
16 | 19 |
17 namespace mojo { | 20 namespace mojo { |
18 | 21 |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
81 virtual ChannelState OnStartOpeningHandshake( | 84 virtual ChannelState OnStartOpeningHandshake( |
82 scoped_ptr<net::WebSocketHandshakeRequestInfo> request) OVERRIDE; | 85 scoped_ptr<net::WebSocketHandshakeRequestInfo> request) OVERRIDE; |
83 virtual ChannelState OnFinishOpeningHandshake( | 86 virtual ChannelState OnFinishOpeningHandshake( |
84 scoped_ptr<net::WebSocketHandshakeResponseInfo> response) OVERRIDE; | 87 scoped_ptr<net::WebSocketHandshakeResponseInfo> response) OVERRIDE; |
85 virtual ChannelState OnSSLCertificateError( | 88 virtual ChannelState OnSSLCertificateError( |
86 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks, | 89 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks, |
87 const GURL& url, | 90 const GURL& url, |
88 const net::SSLInfo& ssl_info, | 91 const net::SSLInfo& ssl_info, |
89 bool fatal) OVERRIDE; | 92 bool fatal) OVERRIDE; |
90 | 93 |
| 94 // Called once we've written to |receive_stream_|. |
| 95 void DidWriteToReceiveStream(bool fin, |
| 96 net::WebSocketFrameHeader::OpCode type, |
| 97 uint32_t num_bytes, |
| 98 const char* buffer); |
91 WebSocketClientPtr client_; | 99 WebSocketClientPtr client_; |
| 100 ScopedDataPipeProducerHandle receive_stream_; |
| 101 scoped_ptr<WebSocketWriteQueue> write_queue_; |
92 | 102 |
93 DISALLOW_COPY_AND_ASSIGN(WebSocketEventHandler); | 103 DISALLOW_COPY_AND_ASSIGN(WebSocketEventHandler); |
94 }; | 104 }; |
95 | 105 |
96 ChannelState WebSocketEventHandler::OnAddChannelResponse( | 106 ChannelState WebSocketEventHandler::OnAddChannelResponse( |
97 bool fail, | 107 bool fail, |
98 const std::string& selected_protocol, | 108 const std::string& selected_protocol, |
99 const std::string& extensions) { | 109 const std::string& extensions) { |
100 client_->DidConnect(fail, selected_protocol, extensions); | 110 DataPipe data_pipe; |
| 111 receive_stream_ = data_pipe.producer_handle.Pass(); |
| 112 write_queue_.reset(new WebSocketWriteQueue(receive_stream_.get())); |
| 113 client_->DidConnect( |
| 114 fail, selected_protocol, extensions, data_pipe.consumer_handle.Pass()); |
101 if (fail) | 115 if (fail) |
102 return WebSocketEventInterface::CHANNEL_DELETED; | 116 return WebSocketEventInterface::CHANNEL_DELETED; |
103 return WebSocketEventInterface::CHANNEL_ALIVE; | 117 return WebSocketEventInterface::CHANNEL_ALIVE; |
104 } | 118 } |
105 | 119 |
106 ChannelState WebSocketEventHandler::OnDataFrame( | 120 ChannelState WebSocketEventHandler::OnDataFrame( |
107 bool fin, | 121 bool fin, |
108 net::WebSocketFrameHeader::OpCode type, | 122 net::WebSocketFrameHeader::OpCode type, |
109 const std::vector<char>& data) { | 123 const std::vector<char>& data) { |
110 // TODO(mpcomplete): reuse the data pipe for subsequent frames. | 124 write_queue_->Write( |
111 uint32_t num_bytes = static_cast<uint32_t>(data.size()); | 125 &data[0], data.size(), |
112 MojoCreateDataPipeOptions options; | 126 base::Bind(&WebSocketEventHandler::DidWriteToReceiveStream, |
113 options.struct_size = sizeof(MojoCreateDataPipeOptions); | 127 base::Unretained(this), |
114 options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; | 128 fin, type, data.size())); |
115 options.element_num_bytes = 1; | |
116 options.capacity_num_bytes = num_bytes; | |
117 DataPipe data_pipe(options); | |
118 WriteDataRaw(data_pipe.producer_handle.get(), | |
119 &data[0], | |
120 &num_bytes, | |
121 MOJO_WRITE_DATA_FLAG_ALL_OR_NONE); | |
122 client_->DidReceiveData(fin, ConvertTo<WebSocket::MessageType>(type), | |
123 data_pipe.consumer_handle.Pass()); | |
124 return WebSocketEventInterface::CHANNEL_ALIVE; | 129 return WebSocketEventInterface::CHANNEL_ALIVE; |
125 } | 130 } |
126 | 131 |
127 ChannelState WebSocketEventHandler::OnClosingHandshake() { | 132 ChannelState WebSocketEventHandler::OnClosingHandshake() { |
128 return WebSocketEventInterface::CHANNEL_ALIVE; | 133 return WebSocketEventInterface::CHANNEL_ALIVE; |
129 } | 134 } |
130 | 135 |
131 ChannelState WebSocketEventHandler::OnFlowControl(int64 quota) { | 136 ChannelState WebSocketEventHandler::OnFlowControl(int64 quota) { |
132 client_->DidReceiveFlowControl(quota); | 137 client_->DidReceiveFlowControl(quota); |
133 return WebSocketEventInterface::CHANNEL_ALIVE; | 138 return WebSocketEventInterface::CHANNEL_ALIVE; |
(...skipping 23 matching lines...) Expand all Loading... |
157 | 162 |
158 ChannelState WebSocketEventHandler::OnSSLCertificateError( | 163 ChannelState WebSocketEventHandler::OnSSLCertificateError( |
159 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks, | 164 scoped_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks, |
160 const GURL& url, | 165 const GURL& url, |
161 const net::SSLInfo& ssl_info, | 166 const net::SSLInfo& ssl_info, |
162 bool fatal) { | 167 bool fatal) { |
163 client_->DidFail("SSL Error"); | 168 client_->DidFail("SSL Error"); |
164 return WebSocketEventInterface::CHANNEL_DELETED; | 169 return WebSocketEventInterface::CHANNEL_DELETED; |
165 } | 170 } |
166 | 171 |
| 172 void WebSocketEventHandler::DidWriteToReceiveStream( |
| 173 bool fin, |
| 174 net::WebSocketFrameHeader::OpCode type, |
| 175 uint32_t num_bytes, |
| 176 const char* buffer) { |
| 177 client_->DidReceiveData( |
| 178 fin, ConvertTo<WebSocket::MessageType>(type), num_bytes); |
| 179 } |
| 180 |
167 } // namespace mojo | 181 } // namespace mojo |
168 | 182 |
169 WebSocketImpl::WebSocketImpl(NetworkContext* context) : context_(context) { | 183 WebSocketImpl::WebSocketImpl(NetworkContext* context) : context_(context) { |
170 } | 184 } |
171 | 185 |
172 WebSocketImpl::~WebSocketImpl() { | 186 WebSocketImpl::~WebSocketImpl() { |
173 } | 187 } |
174 | 188 |
175 void WebSocketImpl::Connect(const String& url, | 189 void WebSocketImpl::Connect(const String& url, |
176 Array<String> protocols, | 190 Array<String> protocols, |
177 const String& origin, | 191 const String& origin, |
| 192 ScopedDataPipeConsumerHandle send_stream, |
178 WebSocketClientPtr client) { | 193 WebSocketClientPtr client) { |
179 DCHECK(!channel_); | 194 DCHECK(!channel_); |
| 195 send_stream_ = send_stream.Pass(); |
| 196 read_queue_.reset(new WebSocketReadQueue(send_stream_.get())); |
180 scoped_ptr<net::WebSocketEventInterface> event_interface( | 197 scoped_ptr<net::WebSocketEventInterface> event_interface( |
181 new WebSocketEventHandler(client.Pass())); | 198 new WebSocketEventHandler(client.Pass())); |
182 channel_.reset(new net::WebSocketChannel(event_interface.Pass(), | 199 channel_.reset(new net::WebSocketChannel(event_interface.Pass(), |
183 context_->url_request_context())); | 200 context_->url_request_context())); |
184 channel_->SendAddChannelRequest(GURL(url.get()), | 201 channel_->SendAddChannelRequest(GURL(url.get()), |
185 protocols.To<std::vector<std::string> >(), | 202 protocols.To<std::vector<std::string> >(), |
186 url::Origin(origin.get())); | 203 url::Origin(origin.get())); |
187 } | 204 } |
188 | 205 |
189 void WebSocketImpl::Send(bool fin, | 206 void WebSocketImpl::Send(bool fin, |
190 WebSocket::MessageType type, | 207 WebSocket::MessageType type, |
191 ScopedDataPipeConsumerHandle data_pipe) { | 208 uint32_t num_bytes) { |
192 DCHECK(channel_); | 209 DCHECK(channel_); |
193 uint32_t num_bytes; | 210 read_queue_->Read(num_bytes, |
194 ReadDataRaw(data_pipe.get(), NULL, &num_bytes, MOJO_READ_DATA_FLAG_QUERY); | 211 base::Bind(&WebSocketImpl::DidReadFromSendStream, |
195 std::vector<char> data(num_bytes); | 212 base::Unretained(this), |
196 ReadDataRaw(data_pipe.get(), &data[0], &num_bytes, MOJO_READ_DATA_FLAG_NONE); | 213 fin, type, num_bytes)); |
197 channel_->SendFrame( | |
198 fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), data); | |
199 } | 214 } |
200 | 215 |
201 void WebSocketImpl::FlowControl(int64_t quota) { | 216 void WebSocketImpl::FlowControl(int64_t quota) { |
202 DCHECK(channel_); | 217 DCHECK(channel_); |
203 channel_->SendFlowControl(quota); | 218 channel_->SendFlowControl(quota); |
204 } | 219 } |
205 | 220 |
206 void WebSocketImpl::Close(uint16_t code, const String& reason) { | 221 void WebSocketImpl::Close(uint16_t code, const String& reason) { |
207 DCHECK(channel_); | 222 DCHECK(channel_); |
208 channel_->StartClosingHandshake(code, reason); | 223 channel_->StartClosingHandshake(code, reason); |
209 } | 224 } |
210 | 225 |
| 226 void WebSocketImpl::DidReadFromSendStream(bool fin, |
| 227 WebSocket::MessageType type, |
| 228 uint32_t num_bytes, |
| 229 const char* data) { |
| 230 std::vector<char> buffer(num_bytes); |
| 231 memcpy(&buffer[0], data, num_bytes); |
| 232 DCHECK(channel_); |
| 233 channel_->SendFrame( |
| 234 fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), buffer); |
| 235 } |
| 236 |
211 } // namespace mojo | 237 } // namespace mojo |
OLD | NEW |