OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/memory/weak_ptr.h" | |
6 #include "base/message_loop/message_loop.h" | 5 #include "base/message_loop/message_loop.h" |
7 #include "base/rand_util.h" | 6 #include "base/rand_util.h" |
8 #include "chrome/browser/devtools/device/android_device_manager.h" | 7 #include "chrome/browser/devtools/device/android_device_manager.h" |
9 #include "content/public/browser/browser_thread.h" | 8 #include "content/public/browser/browser_thread.h" |
10 #include "net/base/io_buffer.h" | 9 #include "net/base/io_buffer.h" |
11 #include "net/base/net_errors.h" | 10 #include "net/base/net_errors.h" |
12 #include "net/server/web_socket.h" | 11 #include "net/server/web_socket.h" |
13 #include "net/socket/stream_socket.h" | 12 #include "net/socket/stream_socket.h" |
14 | 13 |
15 using content::BrowserThread; | 14 using content::BrowserThread; |
16 using net::WebSocket; | 15 using net::WebSocket; |
17 | 16 |
18 namespace { | 17 namespace { |
19 | 18 |
20 const int kBufferSize = 16 * 1024; | 19 const int kBufferSize = 16 * 1024; |
21 | 20 |
22 class WebSocketImpl { | 21 class WebSocketImpl : public AndroidDeviceManager::AndroidWebSocket { |
23 public: | 22 public: |
24 typedef AndroidDeviceManager::AndroidWebSocket::Delegate Delegate; | 23 typedef AndroidDeviceManager::Device Device; |
| 24 WebSocketImpl(scoped_refptr<base::MessageLoopProxy> device_message_loop, |
| 25 scoped_refptr<Device> device, |
| 26 const std::string& socket_name, |
| 27 const std::string& url, |
| 28 Delegate* delegate); |
25 | 29 |
26 WebSocketImpl(Delegate* delegate, | 30 virtual void Connect() OVERRIDE; |
27 scoped_ptr<net::StreamSocket> socket); | 31 virtual void Disconnect() OVERRIDE; |
28 void StartListening(); | 32 virtual void SendFrame(const std::string& message) OVERRIDE; |
29 void SendFrame(const std::string& message); | 33 virtual void ClearDelegate() OVERRIDE; |
30 | 34 |
31 private: | 35 private: |
| 36 friend class base::RefCountedThreadSafe<AndroidWebSocket>; |
| 37 |
| 38 virtual ~WebSocketImpl(); |
| 39 |
| 40 void Connected(int result, net::StreamSocket* socket); |
| 41 void StartListeningOnHandlerThread(); |
32 void OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, int result); | 42 void OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, int result); |
| 43 void SendFrameOnHandlerThread(const std::string& message); |
33 void SendPendingRequests(int result); | 44 void SendPendingRequests(int result); |
34 void Disconnect(); | 45 void DisconnectOnHandlerThread(bool closed_by_device); |
35 | 46 |
36 Delegate* delegate_; | 47 void OnSocketOpened(); |
37 scoped_ptr<net::StreamSocket> socket_; | 48 void OnFrameRead(const std::string& message); |
38 std::string response_buffer_; | 49 void OnSocketClosed(bool closed_by_device); |
39 std::string request_buffer_; | |
40 base::ThreadChecker thread_checker_; | |
41 DISALLOW_COPY_AND_ASSIGN(WebSocketImpl); | |
42 }; | |
43 | |
44 class DelegateWrapper | |
45 : public AndroidDeviceManager::AndroidWebSocket::Delegate { | |
46 public: | |
47 DelegateWrapper(base::WeakPtr<Delegate> weak_delegate, | |
48 scoped_refptr<base::MessageLoopProxy> message_loop) | |
49 : weak_delegate_(weak_delegate), | |
50 message_loop_(message_loop) { | |
51 } | |
52 | |
53 virtual ~DelegateWrapper() {} | |
54 | |
55 // AndroidWebSocket::Delegate implementation | |
56 virtual void OnSocketOpened() OVERRIDE { | |
57 message_loop_->PostTask(FROM_HERE, | |
58 base::Bind(&Delegate::OnSocketOpened, weak_delegate_)); | |
59 } | |
60 | |
61 virtual void OnFrameRead(const std::string& message) OVERRIDE { | |
62 message_loop_->PostTask(FROM_HERE, | |
63 base::Bind(&Delegate::OnFrameRead, weak_delegate_, message)); | |
64 } | |
65 | |
66 virtual void OnSocketClosed() OVERRIDE { | |
67 message_loop_->PostTask(FROM_HERE, | |
68 base::Bind(&Delegate::OnSocketClosed, weak_delegate_)); | |
69 } | |
70 | |
71 private: | |
72 base::WeakPtr<Delegate> weak_delegate_; | |
73 scoped_refptr<base::MessageLoopProxy> message_loop_; | |
74 }; | |
75 | |
76 class AndroidWebSocketImpl | |
77 : public AndroidDeviceManager::AndroidWebSocket, | |
78 public AndroidDeviceManager::AndroidWebSocket::Delegate { | |
79 public: | |
80 typedef AndroidDeviceManager::Device Device; | |
81 AndroidWebSocketImpl( | |
82 scoped_refptr<base::MessageLoopProxy> device_message_loop, | |
83 scoped_refptr<Device> device, | |
84 const std::string& socket_name, | |
85 const std::string& url, | |
86 AndroidWebSocket::Delegate* delegate); | |
87 | |
88 virtual ~AndroidWebSocketImpl(); | |
89 | |
90 // AndroidWebSocket implementation | |
91 virtual void SendFrame(const std::string& message) OVERRIDE; | |
92 | |
93 // AndroidWebSocket::Delegate implementation | |
94 virtual void OnSocketOpened() OVERRIDE; | |
95 virtual void OnFrameRead(const std::string& message) OVERRIDE; | |
96 virtual void OnSocketClosed() OVERRIDE; | |
97 | |
98 private: | |
99 void Connected(int result, scoped_ptr<net::StreamSocket> socket); | |
100 | 50 |
101 scoped_refptr<base::MessageLoopProxy> device_message_loop_; | 51 scoped_refptr<base::MessageLoopProxy> device_message_loop_; |
102 scoped_refptr<Device> device_; | 52 scoped_refptr<Device> device_; |
103 std::string socket_name_; | 53 std::string socket_name_; |
104 std::string url_; | 54 std::string url_; |
105 WebSocketImpl* connection_; | 55 scoped_ptr<net::StreamSocket> socket_; |
106 DelegateWrapper* delegate_wrapper_; | 56 Delegate* delegate_; |
107 AndroidWebSocket::Delegate* delegate_; | 57 std::string response_buffer_; |
108 base::WeakPtrFactory<AndroidWebSocketImpl> weak_factory_; | 58 std::string request_buffer_; |
109 DISALLOW_COPY_AND_ASSIGN(AndroidWebSocketImpl); | |
110 }; | 59 }; |
111 | 60 |
112 AndroidWebSocketImpl::AndroidWebSocketImpl( | 61 WebSocketImpl::WebSocketImpl( |
113 scoped_refptr<base::MessageLoopProxy> device_message_loop, | 62 scoped_refptr<base::MessageLoopProxy> device_message_loop, |
114 scoped_refptr<Device> device, | 63 scoped_refptr<Device> device, |
115 const std::string& socket_name, | 64 const std::string& socket_name, |
116 const std::string& url, | 65 const std::string& url, |
117 AndroidWebSocket::Delegate* delegate) | 66 Delegate* delegate) |
118 : device_message_loop_(device_message_loop), | 67 : device_message_loop_(device_message_loop), |
119 device_(device), | 68 device_(device), |
120 socket_name_(socket_name), | 69 socket_name_(socket_name), |
121 url_(url), | 70 url_(url), |
122 delegate_(delegate), | 71 delegate_(delegate) { |
123 weak_factory_(this) { | |
124 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | |
125 DCHECK(delegate_); | |
126 device_->HttpUpgrade( | |
127 socket_name_, url_, | |
128 base::Bind(&AndroidWebSocketImpl::Connected, weak_factory_.GetWeakPtr())); | |
129 } | 72 } |
130 | 73 |
131 void AndroidWebSocketImpl::SendFrame(const std::string& message) { | 74 void WebSocketImpl::Connect() { |
| 75 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
| 76 device_->HttpUpgrade( |
| 77 socket_name_, url_, base::Bind(&WebSocketImpl::Connected, this)); |
| 78 } |
| 79 |
| 80 void WebSocketImpl::Disconnect() { |
132 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 81 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
133 device_message_loop_->PostTask( | 82 device_message_loop_->PostTask( |
134 FROM_HERE, | 83 FROM_HERE, |
135 base::Bind(&WebSocketImpl::SendFrame, | 84 base::Bind(&WebSocketImpl::DisconnectOnHandlerThread, this, false)); |
136 base::Unretained(connection_), message)); | |
137 } | 85 } |
138 | 86 |
139 void WebSocketImpl::SendFrame(const std::string& message) { | 87 void WebSocketImpl::SendFrame(const std::string& message) { |
140 DCHECK(thread_checker_.CalledOnValidThread()); | 88 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
141 if (!socket_) | 89 device_message_loop_->PostTask( |
142 return; | 90 FROM_HERE, |
| 91 base::Bind(&WebSocketImpl::SendFrameOnHandlerThread, this, message)); |
| 92 } |
| 93 |
| 94 void WebSocketImpl::ClearDelegate() { |
| 95 delegate_ = NULL; |
| 96 } |
| 97 |
| 98 void WebSocketImpl::SendFrameOnHandlerThread(const std::string& message) { |
| 99 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); |
143 int mask = base::RandInt(0, 0x7FFFFFFF); | 100 int mask = base::RandInt(0, 0x7FFFFFFF); |
144 std::string encoded_frame = WebSocket::EncodeFrameHybi17(message, mask); | 101 std::string encoded_frame = WebSocket::EncodeFrameHybi17(message, mask); |
145 request_buffer_ += encoded_frame; | 102 request_buffer_ += encoded_frame; |
146 if (request_buffer_.length() == encoded_frame.length()) | 103 if (request_buffer_.length() == encoded_frame.length()) |
147 SendPendingRequests(0); | 104 SendPendingRequests(0); |
148 } | 105 } |
149 | 106 |
150 AndroidWebSocketImpl::~AndroidWebSocketImpl() { | 107 WebSocketImpl::~WebSocketImpl() { |
151 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 108 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
152 device_message_loop_->DeleteSoon(FROM_HERE, connection_); | |
153 device_message_loop_->DeleteSoon(FROM_HERE, delegate_wrapper_); | |
154 } | 109 } |
155 | 110 |
156 WebSocketImpl::WebSocketImpl(Delegate* delegate, | 111 void WebSocketImpl::Connected(int result, net::StreamSocket* socket) { |
157 scoped_ptr<net::StreamSocket> socket) | |
158 : delegate_(delegate), | |
159 socket_(socket.Pass()) { | |
160 thread_checker_.DetachFromThread(); | |
161 } | |
162 | |
163 void AndroidWebSocketImpl::Connected(int result, | |
164 scoped_ptr<net::StreamSocket> socket) { | |
165 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 112 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
166 if (result != net::OK || socket == NULL) { | 113 if (result != net::OK || socket == NULL) { |
167 OnSocketClosed(); | 114 OnSocketClosed(true); |
168 return; | 115 return; |
169 } | 116 } |
170 delegate_wrapper_ = new DelegateWrapper(weak_factory_.GetWeakPtr(), | 117 socket_.reset(socket); |
171 base::MessageLoopProxy::current()); | |
172 connection_ = new WebSocketImpl(delegate_wrapper_, socket.Pass()); | |
173 device_message_loop_->PostTask( | 118 device_message_loop_->PostTask( |
174 FROM_HERE, | 119 FROM_HERE, |
175 base::Bind(&WebSocketImpl::StartListening, | 120 base::Bind(&WebSocketImpl::StartListeningOnHandlerThread, this)); |
176 base::Unretained(connection_))); | |
177 OnSocketOpened(); | 121 OnSocketOpened(); |
178 } | 122 } |
179 | 123 |
180 void WebSocketImpl::StartListening() { | 124 void WebSocketImpl::StartListeningOnHandlerThread() { |
181 DCHECK(thread_checker_.CalledOnValidThread()); | 125 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); |
182 DCHECK(socket_); | |
183 scoped_refptr<net::IOBuffer> response_buffer = | 126 scoped_refptr<net::IOBuffer> response_buffer = |
184 new net::IOBuffer(kBufferSize); | 127 new net::IOBuffer(kBufferSize); |
185 int result = socket_->Read( | 128 int result = socket_->Read( |
186 response_buffer.get(), | 129 response_buffer.get(), |
187 kBufferSize, | 130 kBufferSize, |
188 base::Bind(&WebSocketImpl::OnBytesRead, | 131 base::Bind(&WebSocketImpl::OnBytesRead, this, response_buffer)); |
189 base::Unretained(this), response_buffer)); | |
190 if (result != net::ERR_IO_PENDING) | 132 if (result != net::ERR_IO_PENDING) |
191 OnBytesRead(response_buffer, result); | 133 OnBytesRead(response_buffer, result); |
192 } | 134 } |
193 | 135 |
194 void WebSocketImpl::OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, | 136 void WebSocketImpl::OnBytesRead( |
195 int result) { | 137 scoped_refptr<net::IOBuffer> response_buffer, int result) { |
196 DCHECK(thread_checker_.CalledOnValidThread()); | 138 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); |
| 139 if (!socket_) |
| 140 return; |
| 141 |
197 if (result <= 0) { | 142 if (result <= 0) { |
198 Disconnect(); | 143 DisconnectOnHandlerThread(true); |
199 return; | 144 return; |
200 } | 145 } |
201 | 146 |
202 std::string data = std::string(response_buffer->data(), result); | 147 std::string data = std::string(response_buffer->data(), result); |
203 response_buffer_ += data; | 148 response_buffer_ += data; |
204 | 149 |
205 int bytes_consumed; | 150 int bytes_consumed; |
206 std::string output; | 151 std::string output; |
207 WebSocket::ParseResult parse_result = WebSocket::DecodeFrameHybi17( | 152 WebSocket::ParseResult parse_result = WebSocket::DecodeFrameHybi17( |
208 response_buffer_, false, &bytes_consumed, &output); | 153 response_buffer_, false, &bytes_consumed, &output); |
209 | 154 |
210 while (parse_result == WebSocket::FRAME_OK) { | 155 while (parse_result == WebSocket::FRAME_OK) { |
211 response_buffer_ = response_buffer_.substr(bytes_consumed); | 156 response_buffer_ = response_buffer_.substr(bytes_consumed); |
212 delegate_->OnFrameRead(output); | 157 BrowserThread::PostTask(BrowserThread::UI, FROM_HERE, |
| 158 base::Bind(&WebSocketImpl::OnFrameRead, this, output)); |
213 parse_result = WebSocket::DecodeFrameHybi17( | 159 parse_result = WebSocket::DecodeFrameHybi17( |
214 response_buffer_, false, &bytes_consumed, &output); | 160 response_buffer_, false, &bytes_consumed, &output); |
215 } | 161 } |
216 | 162 |
217 if (parse_result == WebSocket::FRAME_ERROR || | 163 if (parse_result == WebSocket::FRAME_ERROR || |
218 parse_result == WebSocket::FRAME_CLOSE) { | 164 parse_result == WebSocket::FRAME_CLOSE) { |
219 Disconnect(); | 165 DisconnectOnHandlerThread(true); |
220 return; | 166 return; |
221 } | 167 } |
222 | 168 |
223 result = socket_->Read( | 169 result = socket_->Read( |
224 response_buffer.get(), | 170 response_buffer.get(), |
225 kBufferSize, | 171 kBufferSize, |
226 base::Bind(&WebSocketImpl::OnBytesRead, | 172 base::Bind(&WebSocketImpl::OnBytesRead, this, response_buffer)); |
227 base::Unretained(this), response_buffer)); | |
228 if (result != net::ERR_IO_PENDING) | 173 if (result != net::ERR_IO_PENDING) |
229 OnBytesRead(response_buffer, result); | 174 OnBytesRead(response_buffer, result); |
230 } | 175 } |
231 | 176 |
232 void WebSocketImpl::SendPendingRequests(int result) { | 177 void WebSocketImpl::SendPendingRequests(int result) { |
233 DCHECK(thread_checker_.CalledOnValidThread()); | 178 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); |
| 179 if (!socket_) |
| 180 return; |
234 if (result < 0) { | 181 if (result < 0) { |
235 Disconnect(); | 182 DisconnectOnHandlerThread(true); |
236 return; | 183 return; |
237 } | 184 } |
238 request_buffer_ = request_buffer_.substr(result); | 185 request_buffer_ = request_buffer_.substr(result); |
239 if (request_buffer_.empty()) | 186 if (request_buffer_.empty()) |
240 return; | 187 return; |
241 | 188 |
242 scoped_refptr<net::StringIOBuffer> buffer = | 189 scoped_refptr<net::StringIOBuffer> buffer = |
243 new net::StringIOBuffer(request_buffer_); | 190 new net::StringIOBuffer(request_buffer_); |
244 result = socket_->Write(buffer.get(), buffer->size(), | 191 result = socket_->Write(buffer.get(), buffer->size(), |
245 base::Bind(&WebSocketImpl::SendPendingRequests, | 192 base::Bind(&WebSocketImpl::SendPendingRequests, |
246 base::Unretained(this))); | 193 this)); |
247 if (result != net::ERR_IO_PENDING) | 194 if (result != net::ERR_IO_PENDING) |
248 SendPendingRequests(result); | 195 SendPendingRequests(result); |
249 } | 196 } |
250 | 197 |
251 void WebSocketImpl::Disconnect() { | 198 void WebSocketImpl::DisconnectOnHandlerThread(bool closed_by_device) { |
252 DCHECK(thread_checker_.CalledOnValidThread()); | 199 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); |
253 socket_.reset(); | 200 if (!socket_) |
254 delegate_->OnSocketClosed(); | 201 return; |
| 202 // Wipe out socket_ first since Disconnect can re-enter this method. |
| 203 scoped_ptr<net::StreamSocket> socket(socket_.release()); |
| 204 socket->Disconnect(); |
| 205 BrowserThread::PostTask(BrowserThread::UI, FROM_HERE, |
| 206 base::Bind(&WebSocketImpl::OnSocketClosed, this, closed_by_device)); |
255 } | 207 } |
256 | 208 |
257 void AndroidWebSocketImpl::OnSocketOpened() { | 209 void WebSocketImpl::OnSocketOpened() { |
258 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 210 if (delegate_) |
259 delegate_->OnSocketOpened(); | 211 delegate_->OnSocketOpened(); |
260 } | 212 } |
261 | 213 |
262 void AndroidWebSocketImpl::OnFrameRead(const std::string& message) { | 214 void WebSocketImpl::OnFrameRead(const std::string& message) { |
263 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 215 if (delegate_) |
264 delegate_->OnFrameRead(message); | 216 delegate_->OnFrameRead(message); |
265 } | 217 } |
266 | 218 |
267 void AndroidWebSocketImpl::OnSocketClosed() { | 219 void WebSocketImpl::OnSocketClosed(bool closed_by_device) { |
268 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 220 if (delegate_) |
269 delegate_->OnSocketClosed(); | 221 delegate_->OnSocketClosed(closed_by_device); |
270 } | 222 } |
271 | 223 |
272 } // namespace | 224 } // namespace |
273 | 225 |
274 AndroidDeviceManager::AndroidWebSocket* | 226 scoped_refptr<AndroidDeviceManager::AndroidWebSocket> |
275 AndroidDeviceManager::Device::CreateWebSocket( | 227 AndroidDeviceManager::Device::CreateWebSocket( |
276 const std::string& socket, | 228 const std::string& socket, |
277 const std::string& url, | 229 const std::string& url, |
278 AndroidDeviceManager::AndroidWebSocket::Delegate* delegate) { | 230 AndroidDeviceManager::AndroidWebSocket::Delegate* delegate) { |
279 return new AndroidWebSocketImpl( | 231 return new WebSocketImpl(device_message_loop_, this, socket, url, delegate); |
280 device_message_loop_, this, socket, url, delegate); | |
281 } | 232 } |
OLD | NEW |