OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/spy/spy.h" | 5 #include "mojo/spy/spy.h" |
6 | 6 |
7 #include <vector> | 7 #include <vector> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/compiler_specific.h" | 10 #include "base/compiler_specific.h" |
11 #include "base/location.h" | 11 #include "base/location.h" |
12 #include "base/logging.h" | 12 #include "base/logging.h" |
13 #include "base/memory/ref_counted.h" | 13 #include "base/memory/ref_counted.h" |
14 #include "base/message_loop/message_loop_proxy.h" | 14 #include "base/message_loop/message_loop_proxy.h" |
15 #include "base/strings/string_number_conversions.h" | 15 #include "base/strings/string_number_conversions.h" |
16 #include "base/strings/string_split.h" | 16 #include "base/strings/string_split.h" |
17 #include "base/threading/thread.h" | 17 #include "base/threading/thread.h" |
18 #include "base/threading/worker_pool.h" | 18 #include "base/threading/worker_pool.h" |
19 #include "base/time/time.h" | 19 #include "base/time/time.h" |
20 #include "mojo/application_manager/application_manager.h" | 20 #include "mojo/application_manager/application_manager.h" |
21 #include "mojo/public/cpp/system/core.h" | 21 #include "mojo/public/cpp/system/core.h" |
22 #include "mojo/spy/common.h" | 22 #include "mojo/spy/common.h" |
23 #include "mojo/spy/public/spy.mojom.h" | 23 #include "mojo/spy/public/spy.mojom.h" |
24 #include "mojo/spy/spy_server_impl.h" | 24 #include "mojo/spy/spy_server_impl.h" |
25 #include "mojo/spy/websocket_server.h" | |
26 #include "url/gurl.h" | 25 #include "url/gurl.h" |
27 | 26 |
28 namespace { | 27 namespace { |
29 | 28 |
30 mojo::WebSocketServer* ws_server = NULL; | |
31 | |
32 const size_t kMessageBufSize = 2 * 1024; | 29 const size_t kMessageBufSize = 2 * 1024; |
33 const size_t kHandleBufSize = 64; | 30 const size_t kHandleBufSize = 64; |
34 const int kDefaultWebSocketPort = 42424; | 31 const int kDefaultWebSocketPort = 42424; |
35 | 32 |
36 void CloseHandles(MojoHandle* handles, size_t count) { | 33 void CloseHandles(MojoHandle* handles, size_t count) { |
37 for (size_t ix = 0; ix != count; ++count) | 34 for (size_t ix = 0; ix != count; ++count) |
38 MojoClose(handles[ix]); | 35 MojoClose(handles[ix]); |
39 } | 36 } |
40 | 37 |
41 // In charge of processing messages that flow over a | 38 // In charge of processing messages that flow over a |
42 // single message pipe. | 39 // single message pipe. |
43 class MessageProcessor : | 40 class MessageProcessor : |
44 public base::RefCountedThreadSafe<MessageProcessor> { | 41 public base::RefCountedThreadSafe<MessageProcessor> { |
45 public: | 42 public: |
46 MessageProcessor(base::MessageLoopProxy* control_loop_proxy) | 43 MessageProcessor(mojo::Spy::WebSocketDelegate* websocket_delegate, |
| 44 base::MessageLoopProxy* control_loop_proxy) |
47 : last_result_(MOJO_RESULT_OK), | 45 : last_result_(MOJO_RESULT_OK), |
48 bytes_transfered_(0), | 46 bytes_transfered_(0), |
| 47 websocket_delegate_(websocket_delegate), |
49 control_loop_proxy_(control_loop_proxy) { | 48 control_loop_proxy_(control_loop_proxy) { |
50 message_count_[0] = 0; | 49 message_count_[0] = 0; |
51 message_count_[1] = 0; | 50 message_count_[1] = 0; |
52 handle_count_[0] = 0; | 51 handle_count_[0] = 0; |
53 handle_count_[1] = 0; | 52 handle_count_[1] = 0; |
54 } | 53 } |
55 | 54 |
56 void Start(mojo::ScopedMessagePipeHandle client, | 55 void Start(mojo::ScopedMessagePipeHandle client, |
57 mojo::ScopedMessagePipeHandle interceptor, | 56 mojo::ScopedMessagePipeHandle interceptor, |
58 const GURL& url) { | 57 const GURL& url) { |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
108 mojo::ScopedMessagePipeHandle message_pipe_handle; | 107 mojo::ScopedMessagePipeHandle message_pipe_handle; |
109 message_pipe_handle.reset(mojo::MessagePipeHandle(hbuf[i])); | 108 message_pipe_handle.reset(mojo::MessagePipeHandle(hbuf[i])); |
110 | 109 |
111 mojo::ScopedMessagePipeHandle faux_client; | 110 mojo::ScopedMessagePipeHandle faux_client; |
112 mojo::ScopedMessagePipeHandle interceptor; | 111 mojo::ScopedMessagePipeHandle interceptor; |
113 CreateMessagePipe(NULL, &faux_client, &interceptor); | 112 CreateMessagePipe(NULL, &faux_client, &interceptor); |
114 | 113 |
115 base::WorkerPool::PostTask( | 114 base::WorkerPool::PostTask( |
116 FROM_HERE, | 115 FROM_HERE, |
117 base::Bind(&MessageProcessor::Start, | 116 base::Bind(&MessageProcessor::Start, |
118 this, | 117 this, |
119 base::Passed(&message_pipe_handle), | 118 base::Passed(&message_pipe_handle), |
120 base::Passed(&interceptor), | 119 base::Passed(&interceptor), |
121 url), | 120 url), |
122 true); | 121 true); |
123 hbuf.get()[i] = faux_client.release().value(); | 122 hbuf.get()[i] = faux_client.release().value(); |
124 } | 123 } |
125 } | 124 } |
126 } | 125 } |
127 ++message_count_[r]; | 126 ++message_count_[r]; |
128 bytes_transfered_ += bytes_read; | 127 bytes_transfered_ += bytes_read; |
129 | 128 |
130 LogMessageInfo(mbuf.get(), url); | 129 LogMessageInfo(mbuf.get(), url); |
131 | 130 |
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
196 if ((header.flags & mojo::kMessageExpectsResponse) && | 195 if ((header.flags & mojo::kMessageExpectsResponse) && |
197 (header.flags & mojo::kMessageIsResponse)) { | 196 (header.flags & mojo::kMessageIsResponse)) { |
198 LOG(ERROR) << "Invalid flags combination in request message."; | 197 LOG(ERROR) << "Invalid flags combination in request message."; |
199 LogInvalidMessage(header); | 198 LogInvalidMessage(header); |
200 return false; | 199 return false; |
201 } | 200 } |
202 return true; | 201 return true; |
203 } | 202 } |
204 | 203 |
205 void LogMessageInfo(void* data, const GURL& url) { | 204 void LogMessageInfo(void* data, const GURL& url) { |
| 205 if (!websocket_delegate_) |
| 206 return; |
| 207 |
206 mojo::MojoMessageData* message_data = | 208 mojo::MojoMessageData* message_data = |
207 reinterpret_cast<mojo::MojoMessageData*>(data); | 209 reinterpret_cast<mojo::MojoMessageData*>(data); |
208 if (IsValidMessage(message_data->header)) { | 210 if (IsValidMessage(message_data->header)) { |
209 control_loop_proxy_->PostTask( | 211 control_loop_proxy_->PostTask( |
210 FROM_HERE, | 212 FROM_HERE, |
211 base::Bind(&mojo::WebSocketServer::LogMessageInfo, | 213 base::Bind(&mojo::Spy::WebSocketDelegate::OnMessage, |
212 base::Unretained(ws_server), | 214 base::Unretained(websocket_delegate_), |
213 message_data->header, url, base::Time::Now())); | 215 message_data, |
| 216 url, |
| 217 base::Time::Now())); |
214 } | 218 } |
215 } | 219 } |
216 | 220 |
217 MojoResult last_result_; | 221 MojoResult last_result_; |
218 uint32_t bytes_transfered_; | 222 uint32_t bytes_transfered_; |
219 uint32_t message_count_[2]; | 223 uint32_t message_count_[2]; |
220 uint32_t handle_count_[2]; | 224 uint32_t handle_count_[2]; |
| 225 mojo::Spy::WebSocketDelegate* websocket_delegate_; |
221 scoped_refptr<base::MessageLoopProxy> control_loop_proxy_; | 226 scoped_refptr<base::MessageLoopProxy> control_loop_proxy_; |
222 }; | 227 }; |
223 | 228 |
224 // In charge of intercepting access to the service manager. | 229 // In charge of intercepting access to the service manager. |
225 class SpyInterceptor : public mojo::ApplicationManager::Interceptor { | 230 class SpyInterceptor : public mojo::ApplicationManager::Interceptor { |
226 public: | 231 public: |
227 explicit SpyInterceptor( | 232 explicit SpyInterceptor( |
228 scoped_refptr<mojo::SpyServerImpl> spy_server, | 233 scoped_refptr<mojo::SpyServerImpl> spy_server, |
229 const scoped_refptr<base::MessageLoopProxy>& control_loop_proxy) | 234 const scoped_refptr<base::MessageLoopProxy>& control_loop_proxy, |
| 235 mojo::Spy::WebSocketDelegate* websocket_delegate) |
230 : spy_server_(spy_server), | 236 : spy_server_(spy_server), |
231 proxy_(base::MessageLoopProxy::current()), | 237 proxy_(base::MessageLoopProxy::current()), |
232 control_loop_proxy_(control_loop_proxy) {} | 238 control_loop_proxy_(control_loop_proxy), |
| 239 websocket_delegate_(websocket_delegate) {} |
233 | 240 |
234 private: | 241 private: |
235 virtual mojo::ServiceProviderPtr OnConnectToClient( | 242 virtual mojo::ServiceProviderPtr OnConnectToClient( |
236 const GURL& url, mojo::ServiceProviderPtr real_client) OVERRIDE { | 243 const GURL& url, mojo::ServiceProviderPtr real_client) OVERRIDE { |
237 if (!MustIntercept(url)) | 244 if (!MustIntercept(url)) |
238 return real_client.Pass(); | 245 return real_client.Pass(); |
239 | 246 |
240 // You can get an invalid handle if the app (or service) is | 247 // You can get an invalid handle if the app (or service) is |
241 // created by unconventional means, for example the command line. | 248 // created by unconventional means, for example the command line. |
242 if (!real_client) | 249 if (!real_client) |
243 return real_client.Pass(); | 250 return real_client.Pass(); |
244 | 251 |
245 mojo::ScopedMessagePipeHandle faux_client; | 252 mojo::ScopedMessagePipeHandle faux_client; |
246 mojo::ScopedMessagePipeHandle interceptor; | 253 mojo::ScopedMessagePipeHandle interceptor; |
247 CreateMessagePipe(NULL, &faux_client, &interceptor); | 254 CreateMessagePipe(NULL, &faux_client, &interceptor); |
248 | 255 |
249 scoped_refptr<MessageProcessor> processor = | 256 scoped_refptr<MessageProcessor> processor = |
250 new MessageProcessor(control_loop_proxy_.get()); | 257 new MessageProcessor(websocket_delegate_, control_loop_proxy_.get()); |
251 mojo::ScopedMessagePipeHandle real_handle = real_client.PassMessagePipe(); | 258 mojo::ScopedMessagePipeHandle real_handle = real_client.PassMessagePipe(); |
252 base::WorkerPool::PostTask( | 259 base::WorkerPool::PostTask( |
253 FROM_HERE, | 260 FROM_HERE, |
254 base::Bind(&MessageProcessor::Start, | 261 base::Bind(&MessageProcessor::Start, |
255 processor, | 262 processor, |
256 base::Passed(&real_handle), base::Passed(&interceptor), | 263 base::Passed(&real_handle), base::Passed(&interceptor), |
257 url), | 264 url), |
258 true); | 265 true); |
259 | 266 |
260 mojo::ServiceProviderPtr faux_provider; | 267 mojo::ServiceProviderPtr faux_provider; |
261 faux_provider.Bind(faux_client.Pass()); | 268 faux_provider.Bind(faux_client.Pass()); |
262 return faux_provider.Pass(); | 269 return faux_provider.Pass(); |
263 } | 270 } |
264 | 271 |
265 bool MustIntercept(const GURL& url) { | 272 bool MustIntercept(const GURL& url) { |
266 // TODO(cpu): manage who and when to intercept. | 273 // TODO(cpu): manage who and when to intercept. |
267 proxy_->PostTask( | 274 proxy_->PostTask( |
268 FROM_HERE, | 275 FROM_HERE, |
269 base::Bind(&mojo::SpyServerImpl::OnIntercept, spy_server_, url)); | 276 base::Bind(&mojo::SpyServerImpl::OnIntercept, spy_server_, url)); |
270 return true; | 277 return true; |
271 } | 278 } |
272 | 279 |
273 scoped_refptr<mojo::SpyServerImpl> spy_server_; | 280 scoped_refptr<mojo::SpyServerImpl> spy_server_; |
274 scoped_refptr<base::MessageLoopProxy> proxy_; | 281 scoped_refptr<base::MessageLoopProxy> proxy_; |
275 scoped_refptr<base::MessageLoopProxy> control_loop_proxy_; | 282 scoped_refptr<base::MessageLoopProxy> control_loop_proxy_; |
| 283 mojo::Spy::WebSocketDelegate* websocket_delegate_; |
276 }; | 284 }; |
277 | 285 |
278 void StartWebServer(int port, mojo::ScopedMessagePipeHandle pipe) { | 286 } // namespace |
279 // TODO(cpu) figure out lifetime of the server. See Spy() dtor. | 287 |
280 ws_server = new mojo::WebSocketServer(port, pipe.Pass()); | 288 namespace mojo { |
281 ws_server->Start(); | |
282 } | |
283 | 289 |
284 struct SpyOptions { | 290 struct SpyOptions { |
285 int websocket_port; | 291 int websocket_port; |
286 | 292 |
287 SpyOptions() | 293 SpyOptions() |
288 : websocket_port(kDefaultWebSocketPort) { | 294 : websocket_port(kDefaultWebSocketPort) { |
289 } | 295 } |
290 }; | 296 }; |
291 | 297 |
292 SpyOptions ProcessOptions(const std::string& options) { | 298 SpyOptions ProcessOptions(const std::string& options) { |
293 SpyOptions spy_options; | 299 SpyOptions spy_options; |
294 if (options.empty()) | 300 if (options.empty()) |
295 return spy_options; | 301 return spy_options; |
296 base::StringPairs kv_pairs; | 302 base::StringPairs kv_pairs; |
297 base::SplitStringIntoKeyValuePairs(options, ':', ',', &kv_pairs); | 303 base::SplitStringIntoKeyValuePairs(options, ':', ',', &kv_pairs); |
298 base::StringPairs::iterator it = kv_pairs.begin(); | 304 base::StringPairs::iterator it = kv_pairs.begin(); |
299 for (; it != kv_pairs.end(); ++it) { | 305 for (; it != kv_pairs.end(); ++it) { |
300 if (it->first == "port") { | 306 if (it->first == "port") { |
301 int port; | 307 int port; |
302 if (base::StringToInt(it->second, &port)) | 308 if (base::StringToInt(it->second, &port)) |
303 spy_options.websocket_port = port; | 309 spy_options.websocket_port = port; |
304 } | 310 } |
305 } | 311 } |
306 return spy_options; | 312 return spy_options; |
307 } | 313 } |
308 | 314 |
309 } // namespace | |
310 | |
311 namespace mojo { | |
312 | |
313 Spy::Spy(mojo::ApplicationManager* application_manager, | 315 Spy::Spy(mojo::ApplicationManager* application_manager, |
314 const std::string& options) { | 316 const std::string& options) |
315 SpyOptions spy_options = ProcessOptions(options); | 317 : websocket_delegate_(NULL), |
316 | 318 application_manager_(application_manager), |
| 319 spy_options_(NULL) { |
| 320 spy_options_ = new SpyOptions(ProcessOptions(options)); |
317 spy_server_ = new SpyServerImpl(); | 321 spy_server_ = new SpyServerImpl(); |
318 | 322 |
319 // Start the tread what will accept commands from the frontend. | 323 // Start the tread what will accept commands from the frontend. |
320 control_thread_.reset(new base::Thread("mojo_spy_control_thread")); | 324 control_thread_.reset(new base::Thread("mojo_spy_control_thread")); |
321 base::Thread::Options thread_options(base::MessageLoop::TYPE_IO, 0); | 325 base::Thread::Options thread_options(base::MessageLoop::TYPE_IO, 0); |
322 control_thread_->StartWithOptions(thread_options); | 326 control_thread_->StartWithOptions(thread_options); |
323 control_thread_->message_loop_proxy()->PostTask( | |
324 FROM_HERE, base::Bind(&StartWebServer, | |
325 spy_options.websocket_port, | |
326 base::Passed(spy_server_->ServerPipe()))); | |
327 | |
328 // Start intercepting mojo services. | |
329 application_manager->SetInterceptor( | |
330 new SpyInterceptor(spy_server_, control_thread_->message_loop_proxy())); | |
331 } | 327 } |
332 | 328 |
333 Spy::~Spy() { | 329 Spy::~Spy() { |
334 // TODO(cpu): Do not leak the interceptor. Lifetime between the | 330 // TODO(cpu): Do not leak the interceptor. Lifetime between the |
335 // application_manager and the spy is still unclear hence the leak. | 331 // application_manager and the spy is still unclear hence the leak. |
336 } | 332 } |
337 | 333 |
| 334 void Spy::SetWebSocketDelegate(WebSocketDelegate* websocket_delegate) { |
| 335 DCHECK(websocket_delegate_ == NULL); |
| 336 websocket_delegate_ = websocket_delegate; |
| 337 |
| 338 control_thread_->message_loop_proxy()->PostTask( |
| 339 FROM_HERE, |
| 340 base::Bind(&WebSocketDelegate::Start, |
| 341 base::Unretained(websocket_delegate_), |
| 342 spy_options_->websocket_port, |
| 343 base::Passed(spy_server_->ServerPipe()))); |
| 344 // Start intercepting mojo services. |
| 345 application_manager_->SetInterceptor(new SpyInterceptor( |
| 346 spy_server_, control_thread_->message_loop_proxy(), websocket_delegate_)); |
| 347 } |
| 348 |
338 } // namespace mojo | 349 } // namespace mojo |
OLD | NEW |