Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/memory/weak_ptr.h" | |
| 5 #include "base/message_loop/message_loop.h" | 6 #include "base/message_loop/message_loop.h" |
| 6 #include "base/rand_util.h" | 7 #include "base/rand_util.h" |
| 7 #include "chrome/browser/devtools/device/android_device_manager.h" | 8 #include "chrome/browser/devtools/device/android_device_manager.h" |
| 8 #include "content/public/browser/browser_thread.h" | 9 #include "content/public/browser/browser_thread.h" |
| 9 #include "net/base/io_buffer.h" | 10 #include "net/base/io_buffer.h" |
| 10 #include "net/base/net_errors.h" | 11 #include "net/base/net_errors.h" |
| 11 #include "net/server/web_socket.h" | 12 #include "net/server/web_socket.h" |
| 12 #include "net/socket/stream_socket.h" | 13 #include "net/socket/stream_socket.h" |
| 13 | 14 |
| 14 using content::BrowserThread; | 15 using content::BrowserThread; |
| 15 using net::WebSocket; | 16 using net::WebSocket; |
| 16 | 17 |
| 17 namespace { | 18 namespace { |
| 18 | 19 |
| 19 const int kBufferSize = 16 * 1024; | 20 const int kBufferSize = 16 * 1024; |
| 20 | 21 |
| 21 class WebSocketImpl : public AndroidDeviceManager::AndroidWebSocket { | 22 class WebSocketImpl : public AndroidDeviceManager::AndroidWebSocket { |
| 22 public: | 23 public: |
| 23 typedef AndroidDeviceManager::Device Device; | 24 typedef AndroidDeviceManager::Device Device; |
| 24 WebSocketImpl(scoped_refptr<base::MessageLoopProxy> device_message_loop, | 25 WebSocketImpl(scoped_refptr<base::MessageLoopProxy> device_message_loop, |
| 25 scoped_refptr<Device> device, | 26 scoped_refptr<Device> device, |
| 26 const std::string& socket_name, | 27 const std::string& socket_name, |
| 27 const std::string& url, | 28 const std::string& url, |
| 28 Delegate* delegate); | 29 Delegate* delegate); |
| 29 | 30 |
| 30 virtual void Connect() OVERRIDE; | |
| 31 virtual void Disconnect() OVERRIDE; | |
| 32 virtual void SendFrame(const std::string& message) OVERRIDE; | 31 virtual void SendFrame(const std::string& message) OVERRIDE; |
| 33 virtual void ClearDelegate() OVERRIDE; | 32 virtual ~WebSocketImpl(); |
| 34 | 33 |
| 35 private: | 34 private: |
| 36 friend class base::RefCountedThreadSafe<AndroidWebSocket>; | |
| 37 | 35 |
| 38 virtual ~WebSocketImpl(); | 36 // Counterpart of WebSocketImpl existing on handler thread. Constructed on UI. |
| 37 // All other members must be accessed on handler thread. Owned by | |
| 38 // corresponding WebSocketImpl and reports back to it via weak pointer. | |
| 39 class Connection { | |
|
pfeldman
2014/08/11 11:41:38
Lets inherit from NonThreadSafe?
vkuzkokov
2014/08/11 14:40:03
NonThreadSafe is not recommended in this case. Add
| |
| 40 public: | |
| 41 Connection(base::WeakPtr<WebSocketImpl> weak_web_socket, | |
| 42 net::StreamSocket* socket); | |
| 43 void StartListening(); | |
| 44 void SendFrame(const std::string& message); | |
|
pfeldman
2014/08/11 11:41:38
Blink line below.
vkuzkokov
2014/08/11 14:40:03
Done.
| |
| 45 private: | |
| 46 void OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, int result); | |
| 47 void SendPendingRequests(int result); | |
| 48 void Disconnect(); | |
| 49 base::WeakPtr<WebSocketImpl> weak_web_socket_; | |
| 50 scoped_ptr<net::StreamSocket> socket_; | |
| 51 std::string response_buffer_; | |
| 52 std::string request_buffer_; | |
| 53 DISALLOW_COPY_AND_ASSIGN(Connection); | |
| 54 }; | |
| 39 | 55 |
| 56 static void ConnectedWeak(base::WeakPtr<WebSocketImpl> weak_web_socket, | |
| 57 int result, net::StreamSocket* socket); | |
| 40 void Connected(int result, net::StreamSocket* socket); | 58 void Connected(int result, net::StreamSocket* socket); |
| 41 void StartListeningOnHandlerThread(); | |
| 42 void OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, int result); | |
| 43 void SendFrameOnHandlerThread(const std::string& message); | |
| 44 void SendPendingRequests(int result); | |
| 45 void DisconnectOnHandlerThread(bool closed_by_device); | |
| 46 | 59 |
| 47 void OnSocketOpened(); | 60 void OnSocketOpened(); |
| 48 void OnFrameRead(const std::string& message); | 61 void OnFrameRead(const std::string& message); |
| 49 void OnSocketClosed(bool closed_by_device); | 62 void OnSocketClosed(); |
| 50 | 63 |
| 51 scoped_refptr<base::MessageLoopProxy> device_message_loop_; | 64 scoped_refptr<base::MessageLoopProxy> device_message_loop_; |
| 52 scoped_refptr<Device> device_; | 65 scoped_refptr<Device> device_; |
| 53 std::string socket_name_; | 66 std::string socket_name_; |
| 54 std::string url_; | 67 std::string url_; |
| 55 scoped_ptr<net::StreamSocket> socket_; | 68 Connection* connection_; |
| 56 Delegate* delegate_; | 69 Delegate* delegate_; |
| 57 std::string response_buffer_; | 70 base::WeakPtrFactory<WebSocketImpl> weak_factory_; |
| 58 std::string request_buffer_; | 71 DISALLOW_COPY_AND_ASSIGN(WebSocketImpl); |
| 59 }; | 72 }; |
| 60 | 73 |
| 61 WebSocketImpl::WebSocketImpl( | 74 WebSocketImpl::WebSocketImpl( |
| 62 scoped_refptr<base::MessageLoopProxy> device_message_loop, | 75 scoped_refptr<base::MessageLoopProxy> device_message_loop, |
| 63 scoped_refptr<Device> device, | 76 scoped_refptr<Device> device, |
| 64 const std::string& socket_name, | 77 const std::string& socket_name, |
| 65 const std::string& url, | 78 const std::string& url, |
| 66 Delegate* delegate) | 79 Delegate* delegate) |
| 67 : device_message_loop_(device_message_loop), | 80 : device_message_loop_(device_message_loop), |
| 68 device_(device), | 81 device_(device), |
| 69 socket_name_(socket_name), | 82 socket_name_(socket_name), |
| 70 url_(url), | 83 url_(url), |
| 71 delegate_(delegate) { | 84 delegate_(delegate), |
| 72 } | 85 weak_factory_(this) { |
| 73 | 86 DCHECK(delegate_); |
| 74 void WebSocketImpl::Connect() { | |
| 75 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | |
| 76 device_->HttpUpgrade( | 87 device_->HttpUpgrade( |
| 77 socket_name_, url_, base::Bind(&WebSocketImpl::Connected, this)); | 88 socket_name_, url_, |
| 78 } | 89 base::Bind(&WebSocketImpl::ConnectedWeak, weak_factory_.GetWeakPtr())); |
| 79 | |
| 80 void WebSocketImpl::Disconnect() { | |
| 81 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | |
| 82 device_message_loop_->PostTask( | |
| 83 FROM_HERE, | |
| 84 base::Bind(&WebSocketImpl::DisconnectOnHandlerThread, this, false)); | |
| 85 } | 90 } |
| 86 | 91 |
| 87 void WebSocketImpl::SendFrame(const std::string& message) { | 92 void WebSocketImpl::SendFrame(const std::string& message) { |
| 88 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 93 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
| 89 device_message_loop_->PostTask( | 94 device_message_loop_->PostTask( |
| 90 FROM_HERE, | 95 FROM_HERE, |
| 91 base::Bind(&WebSocketImpl::SendFrameOnHandlerThread, this, message)); | 96 base::Bind(&Connection::SendFrame, |
| 97 base::Unretained(connection_), message)); | |
| 92 } | 98 } |
| 93 | 99 |
| 94 void WebSocketImpl::ClearDelegate() { | 100 void WebSocketImpl::Connection::SendFrame(const std::string& message) { |
| 95 delegate_ = NULL; | 101 if (!socket_) |
| 96 } | 102 return; |
| 97 | |
| 98 void WebSocketImpl::SendFrameOnHandlerThread(const std::string& message) { | |
| 99 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); | |
| 100 int mask = base::RandInt(0, 0x7FFFFFFF); | 103 int mask = base::RandInt(0, 0x7FFFFFFF); |
| 101 std::string encoded_frame = WebSocket::EncodeFrameHybi17(message, mask); | 104 std::string encoded_frame = WebSocket::EncodeFrameHybi17(message, mask); |
| 102 request_buffer_ += encoded_frame; | 105 request_buffer_ += encoded_frame; |
| 103 if (request_buffer_.length() == encoded_frame.length()) | 106 if (request_buffer_.length() == encoded_frame.length()) |
| 104 SendPendingRequests(0); | 107 SendPendingRequests(0); |
| 105 } | 108 } |
| 106 | 109 |
| 107 WebSocketImpl::~WebSocketImpl() { | 110 WebSocketImpl::~WebSocketImpl() { |
| 108 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 111 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
| 112 device_message_loop_->DeleteSoon(FROM_HERE, connection_); | |
| 109 } | 113 } |
| 110 | 114 |
| 111 void WebSocketImpl::Connected(int result, net::StreamSocket* socket) { | 115 WebSocketImpl::Connection::Connection( |
| 116 base::WeakPtr<WebSocketImpl> weak_web_socket, | |
| 117 net::StreamSocket* socket) | |
| 118 : weak_web_socket_(weak_web_socket), | |
| 119 socket_(socket) { | |
| 120 } | |
| 121 | |
| 122 // static | |
| 123 void WebSocketImpl::ConnectedWeak(base::WeakPtr<WebSocketImpl> weak_web_socket, | |
| 124 int result, net::StreamSocket* socket_raw) { | |
| 125 // The sole purpose of a static method here is to avoid leaking StreamSocket | |
|
pfeldman
2014/08/11 11:41:38
Doesn't base::Owned solve this for you?
vkuzkokov
2014/08/11 14:40:03
It doesn't: we don't want StreamSocket destroyed a
| |
| 126 // in case when requesting WebSocketImpl has already been destroyed. | |
| 112 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 127 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
| 128 scoped_ptr<net::StreamSocket> socket(socket_raw); | |
| 129 if (weak_web_socket) | |
| 130 weak_web_socket->Connected(result, socket.release()); | |
| 131 } | |
| 132 | |
| 133 void WebSocketImpl::Connected(int result, net::StreamSocket* socket_raw) { | |
|
pfeldman
2014/08/11 11:41:38
Please DCHECK the threads in all methods (here and
vkuzkokov
2014/08/11 14:40:03
Done.
| |
| 134 scoped_ptr<net::StreamSocket> socket(socket_raw); | |
| 113 if (result != net::OK || socket == NULL) { | 135 if (result != net::OK || socket == NULL) { |
| 114 OnSocketClosed(true); | 136 OnSocketClosed(); |
| 115 return; | 137 return; |
| 116 } | 138 } |
| 117 socket_.reset(socket); | 139 connection_ = new Connection(weak_factory_.GetWeakPtr(), socket.release()); |
| 118 device_message_loop_->PostTask( | 140 device_message_loop_->PostTask( |
| 119 FROM_HERE, | 141 FROM_HERE, |
| 120 base::Bind(&WebSocketImpl::StartListeningOnHandlerThread, this)); | 142 base::Bind(&Connection::StartListening, base::Unretained(connection_))); |
| 121 OnSocketOpened(); | 143 OnSocketOpened(); |
| 122 } | 144 } |
| 123 | 145 |
| 124 void WebSocketImpl::StartListeningOnHandlerThread() { | 146 void WebSocketImpl::Connection::StartListening() { |
| 125 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); | 147 DCHECK(socket_); |
| 126 scoped_refptr<net::IOBuffer> response_buffer = | 148 scoped_refptr<net::IOBuffer> response_buffer = |
| 127 new net::IOBuffer(kBufferSize); | 149 new net::IOBuffer(kBufferSize); |
| 128 int result = socket_->Read( | 150 int result = socket_->Read( |
| 129 response_buffer.get(), | 151 response_buffer.get(), |
| 130 kBufferSize, | 152 kBufferSize, |
| 131 base::Bind(&WebSocketImpl::OnBytesRead, this, response_buffer)); | 153 base::Bind(&Connection::OnBytesRead, |
| 154 base::Unretained(this), response_buffer)); | |
| 132 if (result != net::ERR_IO_PENDING) | 155 if (result != net::ERR_IO_PENDING) |
| 133 OnBytesRead(response_buffer, result); | 156 OnBytesRead(response_buffer, result); |
| 134 } | 157 } |
| 135 | 158 |
| 136 void WebSocketImpl::OnBytesRead( | 159 void WebSocketImpl::Connection::OnBytesRead( |
| 137 scoped_refptr<net::IOBuffer> response_buffer, int result) { | 160 scoped_refptr<net::IOBuffer> response_buffer, int result) { |
| 138 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); | |
| 139 if (!socket_) | |
| 140 return; | |
| 141 | |
| 142 if (result <= 0) { | 161 if (result <= 0) { |
| 143 DisconnectOnHandlerThread(true); | 162 Disconnect(); |
| 144 return; | 163 return; |
| 145 } | 164 } |
| 146 | 165 |
| 147 std::string data = std::string(response_buffer->data(), result); | 166 std::string data = std::string(response_buffer->data(), result); |
| 148 response_buffer_ += data; | 167 response_buffer_ += data; |
| 149 | 168 |
| 150 int bytes_consumed; | 169 int bytes_consumed; |
| 151 std::string output; | 170 std::string output; |
| 152 WebSocket::ParseResult parse_result = WebSocket::DecodeFrameHybi17( | 171 WebSocket::ParseResult parse_result = WebSocket::DecodeFrameHybi17( |
| 153 response_buffer_, false, &bytes_consumed, &output); | 172 response_buffer_, false, &bytes_consumed, &output); |
| 154 | 173 |
| 155 while (parse_result == WebSocket::FRAME_OK) { | 174 while (parse_result == WebSocket::FRAME_OK) { |
| 156 response_buffer_ = response_buffer_.substr(bytes_consumed); | 175 response_buffer_ = response_buffer_.substr(bytes_consumed); |
| 157 BrowserThread::PostTask(BrowserThread::UI, FROM_HERE, | 176 BrowserThread::PostTask(BrowserThread::UI, FROM_HERE, |
| 158 base::Bind(&WebSocketImpl::OnFrameRead, this, output)); | 177 base::Bind(&WebSocketImpl::OnFrameRead, weak_web_socket_, output)); |
| 159 parse_result = WebSocket::DecodeFrameHybi17( | 178 parse_result = WebSocket::DecodeFrameHybi17( |
| 160 response_buffer_, false, &bytes_consumed, &output); | 179 response_buffer_, false, &bytes_consumed, &output); |
| 161 } | 180 } |
| 162 | 181 |
| 163 if (parse_result == WebSocket::FRAME_ERROR || | 182 if (parse_result == WebSocket::FRAME_ERROR || |
| 164 parse_result == WebSocket::FRAME_CLOSE) { | 183 parse_result == WebSocket::FRAME_CLOSE) { |
| 165 DisconnectOnHandlerThread(true); | 184 Disconnect(); |
| 166 return; | 185 return; |
| 167 } | 186 } |
| 168 | 187 |
| 169 result = socket_->Read( | 188 result = socket_->Read( |
| 170 response_buffer.get(), | 189 response_buffer.get(), |
| 171 kBufferSize, | 190 kBufferSize, |
| 172 base::Bind(&WebSocketImpl::OnBytesRead, this, response_buffer)); | 191 base::Bind(&Connection::OnBytesRead, |
| 192 base::Unretained(this), response_buffer)); | |
| 173 if (result != net::ERR_IO_PENDING) | 193 if (result != net::ERR_IO_PENDING) |
| 174 OnBytesRead(response_buffer, result); | 194 OnBytesRead(response_buffer, result); |
| 175 } | 195 } |
| 176 | 196 |
| 177 void WebSocketImpl::SendPendingRequests(int result) { | 197 void WebSocketImpl::Connection::SendPendingRequests(int result) { |
| 178 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); | |
| 179 if (!socket_) | |
| 180 return; | |
| 181 if (result < 0) { | 198 if (result < 0) { |
| 182 DisconnectOnHandlerThread(true); | 199 Disconnect(); |
| 183 return; | 200 return; |
| 184 } | 201 } |
| 185 request_buffer_ = request_buffer_.substr(result); | 202 request_buffer_ = request_buffer_.substr(result); |
| 186 if (request_buffer_.empty()) | 203 if (request_buffer_.empty()) |
| 187 return; | 204 return; |
| 188 | 205 |
| 189 scoped_refptr<net::StringIOBuffer> buffer = | 206 scoped_refptr<net::StringIOBuffer> buffer = |
| 190 new net::StringIOBuffer(request_buffer_); | 207 new net::StringIOBuffer(request_buffer_); |
| 191 result = socket_->Write(buffer.get(), buffer->size(), | 208 result = socket_->Write(buffer.get(), buffer->size(), |
| 192 base::Bind(&WebSocketImpl::SendPendingRequests, | 209 base::Bind(&Connection::SendPendingRequests, |
| 193 this)); | 210 base::Unretained(this))); |
| 194 if (result != net::ERR_IO_PENDING) | 211 if (result != net::ERR_IO_PENDING) |
| 195 SendPendingRequests(result); | 212 SendPendingRequests(result); |
| 196 } | 213 } |
| 197 | 214 |
| 198 void WebSocketImpl::DisconnectOnHandlerThread(bool closed_by_device) { | 215 void WebSocketImpl::Connection::Disconnect() { |
| 199 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); | 216 socket_.reset(); |
| 200 if (!socket_) | |
| 201 return; | |
| 202 // Wipe out socket_ first since Disconnect can re-enter this method. | |
| 203 scoped_ptr<net::StreamSocket> socket(socket_.release()); | |
| 204 socket->Disconnect(); | |
| 205 BrowserThread::PostTask(BrowserThread::UI, FROM_HERE, | 217 BrowserThread::PostTask(BrowserThread::UI, FROM_HERE, |
| 206 base::Bind(&WebSocketImpl::OnSocketClosed, this, closed_by_device)); | 218 base::Bind(&WebSocketImpl::OnSocketClosed, weak_web_socket_)); |
|
pfeldman
2014/08/11 11:41:38
weak_web_socket_ could be pointing to the garbage
vkuzkokov
2014/08/11 14:40:03
That's why we use weak pointer.
| |
| 207 } | 219 } |
| 208 | 220 |
| 209 void WebSocketImpl::OnSocketOpened() { | 221 void WebSocketImpl::OnSocketOpened() { |
| 210 if (delegate_) | 222 delegate_->OnSocketOpened(); |
| 211 delegate_->OnSocketOpened(); | |
| 212 } | 223 } |
| 213 | 224 |
| 214 void WebSocketImpl::OnFrameRead(const std::string& message) { | 225 void WebSocketImpl::OnFrameRead(const std::string& message) { |
| 215 if (delegate_) | 226 delegate_->OnFrameRead(message); |
| 216 delegate_->OnFrameRead(message); | |
| 217 } | 227 } |
| 218 | 228 |
| 219 void WebSocketImpl::OnSocketClosed(bool closed_by_device) { | 229 void WebSocketImpl::OnSocketClosed() { |
| 220 if (delegate_) | 230 delegate_->OnSocketClosed(); |
| 221 delegate_->OnSocketClosed(closed_by_device); | |
| 222 } | 231 } |
| 223 | 232 |
| 224 } // namespace | 233 } // namespace |
| 225 | 234 |
| 226 scoped_refptr<AndroidDeviceManager::AndroidWebSocket> | 235 AndroidDeviceManager::AndroidWebSocket* |
| 227 AndroidDeviceManager::Device::CreateWebSocket( | 236 AndroidDeviceManager::Device::CreateWebSocket( |
| 228 const std::string& socket, | 237 const std::string& socket, |
| 229 const std::string& url, | 238 const std::string& url, |
| 230 AndroidDeviceManager::AndroidWebSocket::Delegate* delegate) { | 239 AndroidDeviceManager::AndroidWebSocket::Delegate* delegate) { |
| 231 return new WebSocketImpl(device_message_loop_, this, socket, url, delegate); | 240 return new WebSocketImpl(device_message_loop_, this, socket, url, delegate); |
| 232 } | 241 } |
| OLD | NEW |