Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(94)

Side by Side Diff: mojo/spy/spy.cc

Issue 648573002: remove websocket server dependency from the mojo spy (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: small cleanup Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/spy/spy.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/spy/spy.h" 5 #include "mojo/spy/spy.h"
6 6
7 #include <vector> 7 #include <vector>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/compiler_specific.h" 10 #include "base/compiler_specific.h"
11 #include "base/location.h" 11 #include "base/location.h"
12 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "base/memory/ref_counted.h" 13 #include "base/memory/ref_counted.h"
14 #include "base/message_loop/message_loop_proxy.h" 14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/strings/string_number_conversions.h" 15 #include "base/strings/string_number_conversions.h"
16 #include "base/strings/string_split.h" 16 #include "base/strings/string_split.h"
17 #include "base/threading/thread.h" 17 #include "base/threading/thread.h"
18 #include "base/threading/worker_pool.h" 18 #include "base/threading/worker_pool.h"
19 #include "base/time/time.h" 19 #include "base/time/time.h"
20 #include "mojo/application_manager/application_manager.h" 20 #include "mojo/application_manager/application_manager.h"
21 #include "mojo/public/cpp/system/core.h" 21 #include "mojo/public/cpp/system/core.h"
22 #include "mojo/spy/common.h" 22 #include "mojo/spy/common.h"
23 #include "mojo/spy/public/spy.mojom.h" 23 #include "mojo/spy/public/spy.mojom.h"
24 #include "mojo/spy/spy_server_impl.h" 24 #include "mojo/spy/spy_server_impl.h"
25 #include "mojo/spy/websocket_server.h"
26 #include "url/gurl.h" 25 #include "url/gurl.h"
27 26
28 namespace { 27 namespace {
29 28
30 mojo::WebSocketServer* ws_server = NULL;
31
32 const size_t kMessageBufSize = 2 * 1024; 29 const size_t kMessageBufSize = 2 * 1024;
33 const size_t kHandleBufSize = 64; 30 const size_t kHandleBufSize = 64;
34 const int kDefaultWebSocketPort = 42424; 31 const int kDefaultWebSocketPort = 42424;
35 32
36 void CloseHandles(MojoHandle* handles, size_t count) { 33 void CloseHandles(MojoHandle* handles, size_t count) {
37 for (size_t ix = 0; ix != count; ++count) 34 for (size_t ix = 0; ix != count; ++count)
38 MojoClose(handles[ix]); 35 MojoClose(handles[ix]);
39 } 36 }
40 37
41 // In charge of processing messages that flow over a 38 // In charge of processing messages that flow over a
42 // single message pipe. 39 // single message pipe.
43 class MessageProcessor : 40 class MessageProcessor :
44 public base::RefCountedThreadSafe<MessageProcessor> { 41 public base::RefCountedThreadSafe<MessageProcessor> {
45 public: 42 public:
46 MessageProcessor(base::MessageLoopProxy* control_loop_proxy) 43 MessageProcessor(mojo::Spy::WebSocketDelegate* websocket_delegate,
44 base::MessageLoopProxy* control_loop_proxy)
47 : last_result_(MOJO_RESULT_OK), 45 : last_result_(MOJO_RESULT_OK),
48 bytes_transfered_(0), 46 bytes_transfered_(0),
47 websocket_delegate_(websocket_delegate),
49 control_loop_proxy_(control_loop_proxy) { 48 control_loop_proxy_(control_loop_proxy) {
50 message_count_[0] = 0; 49 message_count_[0] = 0;
51 message_count_[1] = 0; 50 message_count_[1] = 0;
52 handle_count_[0] = 0; 51 handle_count_[0] = 0;
53 handle_count_[1] = 0; 52 handle_count_[1] = 0;
54 } 53 }
55 54
56 void Start(mojo::ScopedMessagePipeHandle client, 55 void Start(mojo::ScopedMessagePipeHandle client,
57 mojo::ScopedMessagePipeHandle interceptor, 56 mojo::ScopedMessagePipeHandle interceptor,
58 const GURL& url) { 57 const GURL& url) {
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
108 mojo::ScopedMessagePipeHandle message_pipe_handle; 107 mojo::ScopedMessagePipeHandle message_pipe_handle;
109 message_pipe_handle.reset(mojo::MessagePipeHandle(hbuf[i])); 108 message_pipe_handle.reset(mojo::MessagePipeHandle(hbuf[i]));
110 109
111 mojo::ScopedMessagePipeHandle faux_client; 110 mojo::ScopedMessagePipeHandle faux_client;
112 mojo::ScopedMessagePipeHandle interceptor; 111 mojo::ScopedMessagePipeHandle interceptor;
113 CreateMessagePipe(NULL, &faux_client, &interceptor); 112 CreateMessagePipe(NULL, &faux_client, &interceptor);
114 113
115 base::WorkerPool::PostTask( 114 base::WorkerPool::PostTask(
116 FROM_HERE, 115 FROM_HERE,
117 base::Bind(&MessageProcessor::Start, 116 base::Bind(&MessageProcessor::Start,
118 this, 117 this,
119 base::Passed(&message_pipe_handle), 118 base::Passed(&message_pipe_handle),
120 base::Passed(&interceptor), 119 base::Passed(&interceptor),
121 url), 120 url),
122 true); 121 true);
123 hbuf.get()[i] = faux_client.release().value(); 122 hbuf.get()[i] = faux_client.release().value();
124 } 123 }
125 } 124 }
126 } 125 }
127 ++message_count_[r]; 126 ++message_count_[r];
128 bytes_transfered_ += bytes_read; 127 bytes_transfered_ += bytes_read;
129 128
130 LogMessageInfo(mbuf.get(), url); 129 LogMessageInfo(mbuf.get(), url);
131 130
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
196 if ((header.flags & mojo::kMessageExpectsResponse) && 195 if ((header.flags & mojo::kMessageExpectsResponse) &&
197 (header.flags & mojo::kMessageIsResponse)) { 196 (header.flags & mojo::kMessageIsResponse)) {
198 LOG(ERROR) << "Invalid flags combination in request message."; 197 LOG(ERROR) << "Invalid flags combination in request message.";
199 LogInvalidMessage(header); 198 LogInvalidMessage(header);
200 return false; 199 return false;
201 } 200 }
202 return true; 201 return true;
203 } 202 }
204 203
205 void LogMessageInfo(void* data, const GURL& url) { 204 void LogMessageInfo(void* data, const GURL& url) {
205 if (!websocket_delegate_)
206 return;
207
206 mojo::MojoMessageData* message_data = 208 mojo::MojoMessageData* message_data =
207 reinterpret_cast<mojo::MojoMessageData*>(data); 209 reinterpret_cast<mojo::MojoMessageData*>(data);
208 if (IsValidMessage(message_data->header)) { 210 if (IsValidMessage(message_data->header)) {
209 control_loop_proxy_->PostTask( 211 control_loop_proxy_->PostTask(
210 FROM_HERE, 212 FROM_HERE,
211 base::Bind(&mojo::WebSocketServer::LogMessageInfo, 213 base::Bind(&mojo::Spy::WebSocketDelegate::OnMessage,
212 base::Unretained(ws_server), 214 base::Unretained(websocket_delegate_),
213 message_data->header, url, base::Time::Now())); 215 message_data,
216 url,
217 base::Time::Now()));
214 } 218 }
215 } 219 }
216 220
217 MojoResult last_result_; 221 MojoResult last_result_;
218 uint32_t bytes_transfered_; 222 uint32_t bytes_transfered_;
219 uint32_t message_count_[2]; 223 uint32_t message_count_[2];
220 uint32_t handle_count_[2]; 224 uint32_t handle_count_[2];
225 mojo::Spy::WebSocketDelegate* websocket_delegate_;
221 scoped_refptr<base::MessageLoopProxy> control_loop_proxy_; 226 scoped_refptr<base::MessageLoopProxy> control_loop_proxy_;
222 }; 227 };
223 228
224 // In charge of intercepting access to the service manager. 229 // In charge of intercepting access to the service manager.
225 class SpyInterceptor : public mojo::ApplicationManager::Interceptor { 230 class SpyInterceptor : public mojo::ApplicationManager::Interceptor {
226 public: 231 public:
227 explicit SpyInterceptor( 232 explicit SpyInterceptor(
228 scoped_refptr<mojo::SpyServerImpl> spy_server, 233 scoped_refptr<mojo::SpyServerImpl> spy_server,
229 const scoped_refptr<base::MessageLoopProxy>& control_loop_proxy) 234 const scoped_refptr<base::MessageLoopProxy>& control_loop_proxy,
235 mojo::Spy::WebSocketDelegate* websocket_delegate)
230 : spy_server_(spy_server), 236 : spy_server_(spy_server),
231 proxy_(base::MessageLoopProxy::current()), 237 proxy_(base::MessageLoopProxy::current()),
232 control_loop_proxy_(control_loop_proxy) {} 238 control_loop_proxy_(control_loop_proxy),
239 websocket_delegate_(websocket_delegate) {}
233 240
234 private: 241 private:
235 virtual mojo::ServiceProviderPtr OnConnectToClient( 242 virtual mojo::ServiceProviderPtr OnConnectToClient(
236 const GURL& url, mojo::ServiceProviderPtr real_client) OVERRIDE { 243 const GURL& url, mojo::ServiceProviderPtr real_client) OVERRIDE {
237 if (!MustIntercept(url)) 244 if (!MustIntercept(url))
238 return real_client.Pass(); 245 return real_client.Pass();
239 246
240 // You can get an invalid handle if the app (or service) is 247 // You can get an invalid handle if the app (or service) is
241 // created by unconventional means, for example the command line. 248 // created by unconventional means, for example the command line.
242 if (!real_client) 249 if (!real_client)
243 return real_client.Pass(); 250 return real_client.Pass();
244 251
245 mojo::ScopedMessagePipeHandle faux_client; 252 mojo::ScopedMessagePipeHandle faux_client;
246 mojo::ScopedMessagePipeHandle interceptor; 253 mojo::ScopedMessagePipeHandle interceptor;
247 CreateMessagePipe(NULL, &faux_client, &interceptor); 254 CreateMessagePipe(NULL, &faux_client, &interceptor);
248 255
249 scoped_refptr<MessageProcessor> processor = 256 scoped_refptr<MessageProcessor> processor =
250 new MessageProcessor(control_loop_proxy_.get()); 257 new MessageProcessor(websocket_delegate_, control_loop_proxy_.get());
251 mojo::ScopedMessagePipeHandle real_handle = real_client.PassMessagePipe(); 258 mojo::ScopedMessagePipeHandle real_handle = real_client.PassMessagePipe();
252 base::WorkerPool::PostTask( 259 base::WorkerPool::PostTask(
253 FROM_HERE, 260 FROM_HERE,
254 base::Bind(&MessageProcessor::Start, 261 base::Bind(&MessageProcessor::Start,
255 processor, 262 processor,
256 base::Passed(&real_handle), base::Passed(&interceptor), 263 base::Passed(&real_handle), base::Passed(&interceptor),
257 url), 264 url),
258 true); 265 true);
259 266
260 mojo::ServiceProviderPtr faux_provider; 267 mojo::ServiceProviderPtr faux_provider;
261 faux_provider.Bind(faux_client.Pass()); 268 faux_provider.Bind(faux_client.Pass());
262 return faux_provider.Pass(); 269 return faux_provider.Pass();
263 } 270 }
264 271
265 bool MustIntercept(const GURL& url) { 272 bool MustIntercept(const GURL& url) {
266 // TODO(cpu): manage who and when to intercept. 273 // TODO(cpu): manage who and when to intercept.
267 proxy_->PostTask( 274 proxy_->PostTask(
268 FROM_HERE, 275 FROM_HERE,
269 base::Bind(&mojo::SpyServerImpl::OnIntercept, spy_server_, url)); 276 base::Bind(&mojo::SpyServerImpl::OnIntercept, spy_server_, url));
270 return true; 277 return true;
271 } 278 }
272 279
273 scoped_refptr<mojo::SpyServerImpl> spy_server_; 280 scoped_refptr<mojo::SpyServerImpl> spy_server_;
274 scoped_refptr<base::MessageLoopProxy> proxy_; 281 scoped_refptr<base::MessageLoopProxy> proxy_;
275 scoped_refptr<base::MessageLoopProxy> control_loop_proxy_; 282 scoped_refptr<base::MessageLoopProxy> control_loop_proxy_;
283 mojo::Spy::WebSocketDelegate* websocket_delegate_;
276 }; 284 };
277 285
278 void StartWebServer(int port, mojo::ScopedMessagePipeHandle pipe) { 286 } // namespace
279 // TODO(cpu) figure out lifetime of the server. See Spy() dtor. 287
280 ws_server = new mojo::WebSocketServer(port, pipe.Pass()); 288 namespace mojo {
281 ws_server->Start();
282 }
283 289
284 struct SpyOptions { 290 struct SpyOptions {
285 int websocket_port; 291 int websocket_port;
286 292
287 SpyOptions() 293 SpyOptions()
288 : websocket_port(kDefaultWebSocketPort) { 294 : websocket_port(kDefaultWebSocketPort) {
289 } 295 }
290 }; 296 };
291 297
292 SpyOptions ProcessOptions(const std::string& options) { 298 SpyOptions ProcessOptions(const std::string& options) {
293 SpyOptions spy_options; 299 SpyOptions spy_options;
294 if (options.empty()) 300 if (options.empty())
295 return spy_options; 301 return spy_options;
296 base::StringPairs kv_pairs; 302 base::StringPairs kv_pairs;
297 base::SplitStringIntoKeyValuePairs(options, ':', ',', &kv_pairs); 303 base::SplitStringIntoKeyValuePairs(options, ':', ',', &kv_pairs);
298 base::StringPairs::iterator it = kv_pairs.begin(); 304 base::StringPairs::iterator it = kv_pairs.begin();
299 for (; it != kv_pairs.end(); ++it) { 305 for (; it != kv_pairs.end(); ++it) {
300 if (it->first == "port") { 306 if (it->first == "port") {
301 int port; 307 int port;
302 if (base::StringToInt(it->second, &port)) 308 if (base::StringToInt(it->second, &port))
303 spy_options.websocket_port = port; 309 spy_options.websocket_port = port;
304 } 310 }
305 } 311 }
306 return spy_options; 312 return spy_options;
307 } 313 }
308 314
309 } // namespace
310
311 namespace mojo {
312
313 Spy::Spy(mojo::ApplicationManager* application_manager, 315 Spy::Spy(mojo::ApplicationManager* application_manager,
314 const std::string& options) { 316 const std::string& options)
315 SpyOptions spy_options = ProcessOptions(options); 317 : websocket_delegate_(NULL),
316 318 application_manager_(application_manager),
319 spy_options_(NULL) {
320 spy_options_ = new SpyOptions(ProcessOptions(options));
317 spy_server_ = new SpyServerImpl(); 321 spy_server_ = new SpyServerImpl();
318 322
319 // Start the tread what will accept commands from the frontend. 323 // Start the tread what will accept commands from the frontend.
320 control_thread_.reset(new base::Thread("mojo_spy_control_thread")); 324 control_thread_.reset(new base::Thread("mojo_spy_control_thread"));
321 base::Thread::Options thread_options(base::MessageLoop::TYPE_IO, 0); 325 base::Thread::Options thread_options(base::MessageLoop::TYPE_IO, 0);
322 control_thread_->StartWithOptions(thread_options); 326 control_thread_->StartWithOptions(thread_options);
323 control_thread_->message_loop_proxy()->PostTask(
324 FROM_HERE, base::Bind(&StartWebServer,
325 spy_options.websocket_port,
326 base::Passed(spy_server_->ServerPipe())));
327
328 // Start intercepting mojo services.
329 application_manager->SetInterceptor(
330 new SpyInterceptor(spy_server_, control_thread_->message_loop_proxy()));
331 } 327 }
332 328
333 Spy::~Spy() { 329 Spy::~Spy() {
334 // TODO(cpu): Do not leak the interceptor. Lifetime between the 330 // TODO(cpu): Do not leak the interceptor. Lifetime between the
335 // application_manager and the spy is still unclear hence the leak. 331 // application_manager and the spy is still unclear hence the leak.
336 } 332 }
337 333
334 void Spy::SetWebSocketDelegate(WebSocketDelegate* websocket_delegate) {
335 DCHECK(websocket_delegate_ == NULL);
336 websocket_delegate_ = websocket_delegate;
337
338 control_thread_->message_loop_proxy()->PostTask(
339 FROM_HERE,
340 base::Bind(&WebSocketDelegate::Start,
341 base::Unretained(websocket_delegate_),
342 spy_options_->websocket_port,
343 base::Passed(spy_server_->ServerPipe())));
344 // Start intercepting mojo services.
345 application_manager_->SetInterceptor(new SpyInterceptor(
346 spy_server_, control_thread_->message_loop_proxy(), websocket_delegate_));
347 }
348
338 } // namespace mojo 349 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/spy/spy.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698