OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/memory/weak_ptr.h" | |
5 #include "base/message_loop/message_loop.h" | 6 #include "base/message_loop/message_loop.h" |
6 #include "base/rand_util.h" | 7 #include "base/rand_util.h" |
7 #include "chrome/browser/devtools/device/android_device_manager.h" | 8 #include "chrome/browser/devtools/device/android_device_manager.h" |
8 #include "content/public/browser/browser_thread.h" | 9 #include "content/public/browser/browser_thread.h" |
9 #include "net/base/io_buffer.h" | 10 #include "net/base/io_buffer.h" |
10 #include "net/base/net_errors.h" | 11 #include "net/base/net_errors.h" |
11 #include "net/server/web_socket.h" | 12 #include "net/server/web_socket.h" |
12 #include "net/socket/stream_socket.h" | 13 #include "net/socket/stream_socket.h" |
13 | 14 |
14 using content::BrowserThread; | 15 using content::BrowserThread; |
15 using net::WebSocket; | 16 using net::WebSocket; |
16 | 17 |
17 namespace { | 18 namespace { |
18 | 19 |
19 const int kBufferSize = 16 * 1024; | 20 const int kBufferSize = 16 * 1024; |
20 | 21 |
21 class WebSocketImpl : public AndroidDeviceManager::AndroidWebSocket { | 22 // Counterpart of AndroidWebSocketImpl existing on handler thread. |
pfeldman
2014/08/13 09:09:35
There is no mention of AndroidWebSocketImpl in thi
vkuzkokov
2014/08/13 10:00:06
Done.
| |
23 // Constructed on UI. All other members must be accessed on handler thread. | |
24 // Owned by corresponding AndroidWebSocketImpl and reports back to it via | |
25 // weak pointer. | |
26 class WebSocketImpl { | |
27 public: | |
28 typedef AndroidDeviceManager::AndroidWebSocket::Delegate Delegate; | |
29 | |
30 WebSocketImpl(base::WeakPtr<Delegate> weak_delegate, | |
31 scoped_refptr<base::MessageLoopProxy> delegate_message_loop, | |
32 scoped_ptr<net::StreamSocket> socket); | |
33 void StartListening(); | |
34 void SendFrame(const std::string& message); | |
35 | |
36 private: | |
37 void OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, int result); | |
38 void SendPendingRequests(int result); | |
39 void Disconnect(); | |
40 | |
41 base::WeakPtr<Delegate> weak_delegate_; | |
42 scoped_refptr<base::MessageLoopProxy> delegate_message_loop_; | |
43 scoped_ptr<net::StreamSocket> socket_; | |
44 std::string response_buffer_; | |
45 std::string request_buffer_; | |
46 base::ThreadChecker thread_checker_; | |
47 DISALLOW_COPY_AND_ASSIGN(WebSocketImpl); | |
48 }; | |
49 | |
50 class AndroidWebSocketImpl | |
51 : public AndroidDeviceManager::AndroidWebSocket, | |
52 public AndroidDeviceManager::AndroidWebSocket::Delegate { | |
22 public: | 53 public: |
23 typedef AndroidDeviceManager::Device Device; | 54 typedef AndroidDeviceManager::Device Device; |
24 WebSocketImpl(scoped_refptr<base::MessageLoopProxy> device_message_loop, | 55 AndroidWebSocketImpl( |
25 scoped_refptr<Device> device, | 56 scoped_refptr<base::MessageLoopProxy> device_message_loop, |
26 const std::string& socket_name, | 57 scoped_refptr<Device> device, |
27 const std::string& url, | 58 const std::string& socket_name, |
28 Delegate* delegate); | 59 const std::string& url, |
60 AndroidWebSocket::Delegate* delegate); | |
29 | 61 |
30 virtual void Connect() OVERRIDE; | 62 virtual ~AndroidWebSocketImpl(); |
31 virtual void Disconnect() OVERRIDE; | 63 |
64 // AndroidWebSocket implementation | |
32 virtual void SendFrame(const std::string& message) OVERRIDE; | 65 virtual void SendFrame(const std::string& message) OVERRIDE; |
33 virtual void ClearDelegate() OVERRIDE; | 66 |
67 // AndroidWebSocket::Delegate implementation | |
68 void OnSocketOpened(); | |
69 void OnFrameRead(const std::string& message); | |
70 void OnSocketClosed(); | |
34 | 71 |
35 private: | 72 private: |
36 friend class base::RefCountedThreadSafe<AndroidWebSocket>; | 73 void Connected(int result, scoped_ptr<net::StreamSocket> socket); |
37 | |
38 virtual ~WebSocketImpl(); | |
39 | |
40 void Connected(int result, net::StreamSocket* socket); | |
41 void StartListeningOnHandlerThread(); | |
42 void OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, int result); | |
43 void SendFrameOnHandlerThread(const std::string& message); | |
44 void SendPendingRequests(int result); | |
45 void DisconnectOnHandlerThread(bool closed_by_device); | |
46 | |
47 void OnSocketOpened(); | |
48 void OnFrameRead(const std::string& message); | |
49 void OnSocketClosed(bool closed_by_device); | |
50 | 74 |
51 scoped_refptr<base::MessageLoopProxy> device_message_loop_; | 75 scoped_refptr<base::MessageLoopProxy> device_message_loop_; |
52 scoped_refptr<Device> device_; | 76 scoped_refptr<Device> device_; |
53 std::string socket_name_; | 77 std::string socket_name_; |
54 std::string url_; | 78 std::string url_; |
55 scoped_ptr<net::StreamSocket> socket_; | 79 WebSocketImpl* connection_; |
56 Delegate* delegate_; | 80 AndroidWebSocket::Delegate* delegate_; |
57 std::string response_buffer_; | 81 base::WeakPtrFactory<AndroidWebSocketImpl> weak_factory_; |
58 std::string request_buffer_; | 82 DISALLOW_COPY_AND_ASSIGN(AndroidWebSocketImpl); |
59 }; | 83 }; |
60 | 84 |
61 WebSocketImpl::WebSocketImpl( | 85 AndroidWebSocketImpl::AndroidWebSocketImpl( |
62 scoped_refptr<base::MessageLoopProxy> device_message_loop, | 86 scoped_refptr<base::MessageLoopProxy> device_message_loop, |
63 scoped_refptr<Device> device, | 87 scoped_refptr<Device> device, |
64 const std::string& socket_name, | 88 const std::string& socket_name, |
65 const std::string& url, | 89 const std::string& url, |
66 Delegate* delegate) | 90 AndroidWebSocket::Delegate* delegate) |
67 : device_message_loop_(device_message_loop), | 91 : device_message_loop_(device_message_loop), |
68 device_(device), | 92 device_(device), |
69 socket_name_(socket_name), | 93 socket_name_(socket_name), |
70 url_(url), | 94 url_(url), |
71 delegate_(delegate) { | 95 delegate_(delegate), |
96 weak_factory_(this) { | |
97 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | |
98 DCHECK(delegate_); | |
99 device_->HttpUpgrade( | |
100 socket_name_, url_, | |
101 base::Bind(&AndroidWebSocketImpl::Connected, weak_factory_.GetWeakPtr())); | |
72 } | 102 } |
73 | 103 |
74 void WebSocketImpl::Connect() { | 104 void AndroidWebSocketImpl::SendFrame(const std::string& message) { |
75 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | |
76 device_->HttpUpgrade( | |
77 socket_name_, url_, base::Bind(&WebSocketImpl::Connected, this)); | |
78 } | |
79 | |
80 void WebSocketImpl::Disconnect() { | |
81 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 105 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
82 device_message_loop_->PostTask( | 106 device_message_loop_->PostTask( |
83 FROM_HERE, | 107 FROM_HERE, |
84 base::Bind(&WebSocketImpl::DisconnectOnHandlerThread, this, false)); | 108 base::Bind(&WebSocketImpl::SendFrame, |
109 base::Unretained(connection_), message)); | |
85 } | 110 } |
86 | 111 |
87 void WebSocketImpl::SendFrame(const std::string& message) { | 112 void WebSocketImpl::SendFrame(const std::string& message) { |
88 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 113 DCHECK(thread_checker_.CalledOnValidThread()); |
89 device_message_loop_->PostTask( | 114 if (!socket_) |
90 FROM_HERE, | 115 return; |
91 base::Bind(&WebSocketImpl::SendFrameOnHandlerThread, this, message)); | |
92 } | |
93 | |
94 void WebSocketImpl::ClearDelegate() { | |
95 delegate_ = NULL; | |
96 } | |
97 | |
98 void WebSocketImpl::SendFrameOnHandlerThread(const std::string& message) { | |
99 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); | |
100 int mask = base::RandInt(0, 0x7FFFFFFF); | 116 int mask = base::RandInt(0, 0x7FFFFFFF); |
101 std::string encoded_frame = WebSocket::EncodeFrameHybi17(message, mask); | 117 std::string encoded_frame = WebSocket::EncodeFrameHybi17(message, mask); |
102 request_buffer_ += encoded_frame; | 118 request_buffer_ += encoded_frame; |
103 if (request_buffer_.length() == encoded_frame.length()) | 119 if (request_buffer_.length() == encoded_frame.length()) |
104 SendPendingRequests(0); | 120 SendPendingRequests(0); |
105 } | 121 } |
106 | 122 |
107 WebSocketImpl::~WebSocketImpl() { | 123 AndroidWebSocketImpl::~AndroidWebSocketImpl() { |
108 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 124 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
125 device_message_loop_->DeleteSoon(FROM_HERE, connection_); | |
109 } | 126 } |
110 | 127 |
111 void WebSocketImpl::Connected(int result, net::StreamSocket* socket) { | 128 WebSocketImpl::WebSocketImpl( |
129 base::WeakPtr<Delegate> weak_delegate, | |
130 scoped_refptr<base::MessageLoopProxy> delegate_message_loop, | |
131 scoped_ptr<net::StreamSocket> socket) | |
132 : weak_delegate_(weak_delegate), | |
133 delegate_message_loop_(delegate_message_loop), | |
134 socket_(socket.Pass()) { | |
135 } | |
136 | |
137 void AndroidWebSocketImpl::Connected(int result, | |
138 scoped_ptr<net::StreamSocket> socket) { | |
pfeldman
2014/08/13 09:09:35
Poor indent
vkuzkokov
2014/08/13 10:00:06
Done.
| |
112 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 139 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
113 if (result != net::OK || socket == NULL) { | 140 if (result != net::OK || socket == NULL) { |
114 OnSocketClosed(true); | 141 OnSocketClosed(); |
115 return; | 142 return; |
116 } | 143 } |
117 socket_.reset(socket); | 144 connection_ = new WebSocketImpl(weak_factory_.GetWeakPtr(), |
145 base::MessageLoopProxy::current(), | |
146 socket.Pass()); | |
118 device_message_loop_->PostTask( | 147 device_message_loop_->PostTask( |
119 FROM_HERE, | 148 FROM_HERE, |
120 base::Bind(&WebSocketImpl::StartListeningOnHandlerThread, this)); | 149 base::Bind(&WebSocketImpl::StartListening, |
150 base::Unretained(connection_))); | |
121 OnSocketOpened(); | 151 OnSocketOpened(); |
122 } | 152 } |
123 | 153 |
124 void WebSocketImpl::StartListeningOnHandlerThread() { | 154 void WebSocketImpl::StartListening() { |
125 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); | 155 DCHECK(thread_checker_.CalledOnValidThread()); |
156 DCHECK(socket_); | |
126 scoped_refptr<net::IOBuffer> response_buffer = | 157 scoped_refptr<net::IOBuffer> response_buffer = |
127 new net::IOBuffer(kBufferSize); | 158 new net::IOBuffer(kBufferSize); |
128 int result = socket_->Read( | 159 int result = socket_->Read( |
129 response_buffer.get(), | 160 response_buffer.get(), |
130 kBufferSize, | 161 kBufferSize, |
131 base::Bind(&WebSocketImpl::OnBytesRead, this, response_buffer)); | 162 base::Bind(&WebSocketImpl::OnBytesRead, |
163 base::Unretained(this), response_buffer)); | |
132 if (result != net::ERR_IO_PENDING) | 164 if (result != net::ERR_IO_PENDING) |
133 OnBytesRead(response_buffer, result); | 165 OnBytesRead(response_buffer, result); |
134 } | 166 } |
135 | 167 |
136 void WebSocketImpl::OnBytesRead( | 168 void WebSocketImpl::OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, |
137 scoped_refptr<net::IOBuffer> response_buffer, int result) { | 169 int result) { |
138 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); | 170 DCHECK(thread_checker_.CalledOnValidThread()); |
139 if (!socket_) | |
140 return; | |
141 | |
142 if (result <= 0) { | 171 if (result <= 0) { |
143 DisconnectOnHandlerThread(true); | 172 Disconnect(); |
144 return; | 173 return; |
145 } | 174 } |
146 | 175 |
147 std::string data = std::string(response_buffer->data(), result); | 176 std::string data = std::string(response_buffer->data(), result); |
148 response_buffer_ += data; | 177 response_buffer_ += data; |
149 | 178 |
150 int bytes_consumed; | 179 int bytes_consumed; |
151 std::string output; | 180 std::string output; |
152 WebSocket::ParseResult parse_result = WebSocket::DecodeFrameHybi17( | 181 WebSocket::ParseResult parse_result = WebSocket::DecodeFrameHybi17( |
153 response_buffer_, false, &bytes_consumed, &output); | 182 response_buffer_, false, &bytes_consumed, &output); |
154 | 183 |
155 while (parse_result == WebSocket::FRAME_OK) { | 184 while (parse_result == WebSocket::FRAME_OK) { |
156 response_buffer_ = response_buffer_.substr(bytes_consumed); | 185 response_buffer_ = response_buffer_.substr(bytes_consumed); |
157 BrowserThread::PostTask(BrowserThread::UI, FROM_HERE, | 186 delegate_message_loop_->PostTask(FROM_HERE, |
pfeldman
2014/08/13 09:09:35
Threading should only be handled in the AndroidWeb
vkuzkokov
2014/08/13 10:00:06
Rather it shouldn't be handled in WebSocketImpl. W
| |
158 base::Bind(&WebSocketImpl::OnFrameRead, this, output)); | 187 base::Bind(&Delegate::OnFrameRead, weak_delegate_, output)); |
159 parse_result = WebSocket::DecodeFrameHybi17( | 188 parse_result = WebSocket::DecodeFrameHybi17( |
160 response_buffer_, false, &bytes_consumed, &output); | 189 response_buffer_, false, &bytes_consumed, &output); |
161 } | 190 } |
162 | 191 |
163 if (parse_result == WebSocket::FRAME_ERROR || | 192 if (parse_result == WebSocket::FRAME_ERROR || |
164 parse_result == WebSocket::FRAME_CLOSE) { | 193 parse_result == WebSocket::FRAME_CLOSE) { |
165 DisconnectOnHandlerThread(true); | 194 Disconnect(); |
166 return; | 195 return; |
167 } | 196 } |
168 | 197 |
169 result = socket_->Read( | 198 result = socket_->Read( |
170 response_buffer.get(), | 199 response_buffer.get(), |
171 kBufferSize, | 200 kBufferSize, |
172 base::Bind(&WebSocketImpl::OnBytesRead, this, response_buffer)); | 201 base::Bind(&WebSocketImpl::OnBytesRead, |
202 base::Unretained(this), response_buffer)); | |
173 if (result != net::ERR_IO_PENDING) | 203 if (result != net::ERR_IO_PENDING) |
174 OnBytesRead(response_buffer, result); | 204 OnBytesRead(response_buffer, result); |
175 } | 205 } |
176 | 206 |
177 void WebSocketImpl::SendPendingRequests(int result) { | 207 void WebSocketImpl::SendPendingRequests(int result) { |
178 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); | 208 DCHECK(thread_checker_.CalledOnValidThread()); |
179 if (!socket_) | |
180 return; | |
181 if (result < 0) { | 209 if (result < 0) { |
182 DisconnectOnHandlerThread(true); | 210 Disconnect(); |
183 return; | 211 return; |
184 } | 212 } |
185 request_buffer_ = request_buffer_.substr(result); | 213 request_buffer_ = request_buffer_.substr(result); |
186 if (request_buffer_.empty()) | 214 if (request_buffer_.empty()) |
187 return; | 215 return; |
188 | 216 |
189 scoped_refptr<net::StringIOBuffer> buffer = | 217 scoped_refptr<net::StringIOBuffer> buffer = |
190 new net::StringIOBuffer(request_buffer_); | 218 new net::StringIOBuffer(request_buffer_); |
191 result = socket_->Write(buffer.get(), buffer->size(), | 219 result = socket_->Write(buffer.get(), buffer->size(), |
192 base::Bind(&WebSocketImpl::SendPendingRequests, | 220 base::Bind(&WebSocketImpl::SendPendingRequests, |
193 this)); | 221 base::Unretained(this))); |
194 if (result != net::ERR_IO_PENDING) | 222 if (result != net::ERR_IO_PENDING) |
195 SendPendingRequests(result); | 223 SendPendingRequests(result); |
196 } | 224 } |
197 | 225 |
198 void WebSocketImpl::DisconnectOnHandlerThread(bool closed_by_device) { | 226 void WebSocketImpl::Disconnect() { |
199 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); | 227 DCHECK(thread_checker_.CalledOnValidThread()); |
200 if (!socket_) | 228 socket_.reset(); |
201 return; | 229 delegate_message_loop_->PostTask(FROM_HERE, |
pfeldman
2014/08/13 09:09:35
ditto
vkuzkokov
2014/08/13 10:00:06
Ditto.
| |
202 // Wipe out socket_ first since Disconnect can re-enter this method. | 230 base::Bind(&Delegate::OnSocketClosed, weak_delegate_)); |
203 scoped_ptr<net::StreamSocket> socket(socket_.release()); | |
204 socket->Disconnect(); | |
205 BrowserThread::PostTask(BrowserThread::UI, FROM_HERE, | |
206 base::Bind(&WebSocketImpl::OnSocketClosed, this, closed_by_device)); | |
207 } | 231 } |
208 | 232 |
209 void WebSocketImpl::OnSocketOpened() { | 233 void AndroidWebSocketImpl::OnSocketOpened() { |
210 if (delegate_) | 234 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
211 delegate_->OnSocketOpened(); | 235 delegate_->OnSocketOpened(); |
212 } | 236 } |
213 | 237 |
214 void WebSocketImpl::OnFrameRead(const std::string& message) { | 238 void AndroidWebSocketImpl::OnFrameRead(const std::string& message) { |
215 if (delegate_) | 239 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
216 delegate_->OnFrameRead(message); | 240 delegate_->OnFrameRead(message); |
217 } | 241 } |
218 | 242 |
219 void WebSocketImpl::OnSocketClosed(bool closed_by_device) { | 243 void AndroidWebSocketImpl::OnSocketClosed() { |
220 if (delegate_) | 244 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
221 delegate_->OnSocketClosed(closed_by_device); | 245 delegate_->OnSocketClosed(); |
222 } | 246 } |
223 | 247 |
224 } // namespace | 248 } // namespace |
225 | 249 |
226 scoped_refptr<AndroidDeviceManager::AndroidWebSocket> | 250 AndroidDeviceManager::AndroidWebSocket* |
227 AndroidDeviceManager::Device::CreateWebSocket( | 251 AndroidDeviceManager::Device::CreateWebSocket( |
228 const std::string& socket, | 252 const std::string& socket, |
229 const std::string& url, | 253 const std::string& url, |
230 AndroidDeviceManager::AndroidWebSocket::Delegate* delegate) { | 254 AndroidDeviceManager::AndroidWebSocket::Delegate* delegate) { |
231 return new WebSocketImpl(device_message_loop_, this, socket, url, delegate); | 255 return new AndroidWebSocketImpl( |
256 device_message_loop_, this, socket, url, delegate); | |
232 } | 257 } |
OLD | NEW |