Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(63)

Side by Side Diff: mojo/spy/spy.cc

Issue 354043003: Add support for logging information about mojo messages retrieved by the mojo debugger (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Intercept the first set of pipes Created 6 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/spy/spy.h" 5 #include "mojo/spy/spy.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/compiler_specific.h" 8 #include "base/compiler_specific.h"
9 #include "base/location.h" 9 #include "base/location.h"
10 #include "base/logging.h"
10 #include "base/memory/ref_counted.h" 11 #include "base/memory/ref_counted.h"
12 #include "base/time/time.h"
11 #include "base/strings/string_number_conversions.h" 13 #include "base/strings/string_number_conversions.h"
12 #include "base/strings/string_split.h" 14 #include "base/strings/string_split.h"
13 #include "base/threading/thread.h" 15 #include "base/threading/thread.h"
14 #include "base/threading/worker_pool.h" 16 #include "base/threading/worker_pool.h"
15 17
16 #include "mojo/public/cpp/system/core.h" 18 #include "mojo/public/cpp/system/core.h"
17 #include "mojo/service_manager/service_manager.h" 19 #include "mojo/service_manager/service_manager.h"
20 #include "mojo/spy/common.h"
18 #include "mojo/spy/websocket_server.h" 21 #include "mojo/spy/websocket_server.h"
19 22
20 namespace { 23 namespace {
21 24
25 spy::WebSocketServer* ws_server = NULL;
26
22 const size_t kMessageBufSize = 2 * 1024; 27 const size_t kMessageBufSize = 2 * 1024;
23 const size_t kHandleBufSize = 64; 28 const size_t kHandleBufSize = 64;
24 const int kDefaultWebSocketPort = 42424; 29 const int kDefaultWebSocketPort = 42424;
25 30
26 void CloseHandles(MojoHandle* handles, size_t count) { 31 void CloseHandles(MojoHandle* handles, size_t count) {
27 for (size_t ix = 0; ix != count; ++count) 32 for (size_t ix = 0; ix != count; ++count)
28 MojoClose(handles[ix]); 33 MojoClose(handles[ix]);
29 } 34 }
30 35
31 // In charge of processing messages that flow over a 36 // In charge of processing messages that flow over a
32 // single message pipe. 37 // single message pipe.
33 class MessageProcessor : 38 class MessageProcessor :
34 public base::RefCountedThreadSafe<MessageProcessor> { 39 public base::RefCountedThreadSafe<MessageProcessor> {
35 public: 40 public:
36 41
37 MessageProcessor() 42 MessageProcessor(base::MessageLoopProxy* control_loop_proxy)
38 : last_result_(MOJO_RESULT_OK), 43 : last_result_(MOJO_RESULT_OK),
39 bytes_transfered_(0) { 44 bytes_transfered_(0),
45 control_loop_proxy_(control_loop_proxy),
46 service_vendor_message_pipe_received_(false) {
40 47
41 message_count_[0] = 0; 48 message_count_[0] = 0;
42 message_count_[1] = 0; 49 message_count_[1] = 0;
43 handle_count_[0] = 0; 50 handle_count_[0] = 0;
44 handle_count_[1] = 0; 51 handle_count_[1] = 0;
45 } 52 }
46 53
47 void Start(mojo::ScopedMessagePipeHandle client, 54 void Start(mojo::ScopedMessagePipeHandle client,
48 mojo::ScopedMessagePipeHandle interceptor) { 55 mojo::ScopedMessagePipeHandle interceptor,
56 const GURL& url) {
49 std::vector<mojo::MessagePipeHandle> pipes; 57 std::vector<mojo::MessagePipeHandle> pipes;
50 pipes.push_back(client.get()); 58 pipes.push_back(client.get());
51 pipes.push_back(interceptor.get()); 59 pipes.push_back(interceptor.get());
52 std::vector<MojoHandleSignals> handle_signals; 60 std::vector<MojoHandleSignals> handle_signals;
53 handle_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE); 61 handle_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
54 handle_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE); 62 handle_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
55 63
56 scoped_ptr<char[]> mbuf(new char[kMessageBufSize]); 64 scoped_ptr<char[]> mbuf(new char[kMessageBufSize]);
57 scoped_ptr<MojoHandle[]> hbuf(new MojoHandle[kHandleBufSize]); 65 scoped_ptr<MojoHandle[]> hbuf(new MojoHandle[kHandleBufSize]);
58 66
(...skipping 16 matching lines...) Expand all
75 83
76 if (!CheckResult(ReadMessageRaw(pipes[r], 84 if (!CheckResult(ReadMessageRaw(pipes[r],
77 mbuf.get(), &bytes_read, 85 mbuf.get(), &bytes_read,
78 hbuf.get(), &handles_read, 86 hbuf.get(), &handles_read,
79 MOJO_READ_MESSAGE_FLAG_NONE))) 87 MOJO_READ_MESSAGE_FLAG_NONE)))
80 break; 88 break;
81 89
82 if (!bytes_read && !handles_read) 90 if (!bytes_read && !handles_read)
83 continue; 91 continue;
84 92
85 if (handles_read) 93 if (handles_read) {
86 handle_count_[r] += handles_read; 94 handle_count_[r] += handles_read;
87 95
96 // Intercept the first set of handles to message pipes with the
97 // assumption that these would be used for vending mojo services.
98 // TODO(ananta)
99 // The above approach is hacky and could cause us to miss other message
100 // pipes which could be exchanged between the client and the server.
101 // Look into a cleaner way of identifying message pipe handles.
102 if (!service_vendor_message_pipe_received_) {
103 service_vendor_message_pipe_received_ = true;
104 for (uint32_t i = 0; i < handles_read; i++) {
105 mojo::ScopedMessagePipeHandle message_pipe_handle;
106 message_pipe_handle.reset(mojo::MessagePipeHandle(hbuf[i]));
107
108 mojo::ScopedMessagePipeHandle faux_client;
109 mojo::ScopedMessagePipeHandle interceptor;
110 CreateMessagePipe(NULL, &faux_client, &interceptor);
111
112 base::WorkerPool::PostTask(
113 FROM_HERE,
114 base::Bind(&MessageProcessor::Start,
115 this,
116 base::Passed(&message_pipe_handle),
117 base::Passed(&interceptor),
118 url),
119 true);
120 hbuf.get()[i] = faux_client.release().value();
121 }
122 }
123 }
88 ++message_count_[r]; 124 ++message_count_[r];
89 bytes_transfered_ += bytes_read; 125 bytes_transfered_ += bytes_read;
90 126
127 LogMessageInfo(mbuf.get(), url);
128
91 mojo::MessagePipeHandle write_handle = (r == 0) ? pipes[1] : pipes[0]; 129 mojo::MessagePipeHandle write_handle = (r == 0) ? pipes[1] : pipes[0];
92 if (!CheckResult(Wait(write_handle, 130 if (!CheckResult(Wait(write_handle,
93 MOJO_HANDLE_SIGNAL_WRITABLE, 131 MOJO_HANDLE_SIGNAL_WRITABLE,
94 MOJO_DEADLINE_INDEFINITE))) 132 MOJO_DEADLINE_INDEFINITE)))
95 break; 133 break;
96 134
97 if (!CheckResult(WriteMessageRaw(write_handle, 135 if (!CheckResult(WriteMessageRaw(write_handle,
98 mbuf.get(), bytes_read, 136 mbuf.get(), bytes_read,
99 hbuf.get(), handles_read, 137 hbuf.get(), handles_read,
100 MOJO_WRITE_MESSAGE_FLAG_NONE))) { 138 MOJO_WRITE_MESSAGE_FLAG_NONE))) {
101 // On failure we own the handles. For now just close them. 139 // On failure we own the handles. For now just close them.
102 if (handles_read) 140 if (handles_read)
103 CloseHandles(hbuf.get(), handles_read); 141 CloseHandles(hbuf.get(), handles_read);
104 break; 142 break;
105 } 143 }
106 } 144 }
107 } 145 }
108 146
109 private: 147 private:
110 friend class base::RefCountedThreadSafe<MessageProcessor>; 148 friend class base::RefCountedThreadSafe<MessageProcessor>;
111 virtual ~MessageProcessor() {} 149 virtual ~MessageProcessor() {}
112 150
113 bool CheckResult(MojoResult mr) { 151 bool CheckResult(MojoResult mr) {
114 if (mr == MOJO_RESULT_OK) 152 if (mr == MOJO_RESULT_OK)
115 return true; 153 return true;
116 last_result_ = mr; 154 last_result_ = mr;
117 return false; 155 return false;
118 } 156 }
119 157
120 MojoResult last_result_; 158 void LogInvalidMessage(const mojo::MojoMessageHeader& header) {
121 uint32_t bytes_transfered_; 159 LOG(ERROR) << "Invalid message: Number of Fields: "
122 uint32_t message_count_[2]; 160 << header.num_fields
123 uint32_t handle_count_[2]; 161 << " Number of bytes: "
162 << header.num_bytes
163 << " Flags: "
164 << header.flags;
165 }
166
167 // Validates the message as per the mojo spec.
168 bool IsValidMessage(const mojo::MojoMessageHeader& header) {
169 if (header.num_fields == 2) {
170 if (header.num_bytes != sizeof(mojo::MojoMessageHeader)) {
171 LogInvalidMessage(header);
172 return false;
173 }
174 } else if (header.num_fields == 3) {
175 if (header.num_bytes != sizeof(mojo::MojoRequestHeader)) {
176 LogInvalidMessage(header);
177 }
178 } else if (header.num_fields > 3) {
179 if (header.num_bytes < sizeof(mojo::MojoRequestHeader)) {
180 LogInvalidMessage(header);
181 return false;
182 }
183 }
184
185 // These flags should be specified in request or response messages.
186 if (header.num_fields < 3 &&
187 ((header.flags & mojo::kMessageExpectsResponse) ||
188 (header.flags & mojo::kMessageIsResponse))) {
189 LOG(ERROR) << "Invalid request message.";
190 LogInvalidMessage(header);
191 return false;
192 }
193 // These flags are mutually exclusive.
194 if ((header.flags & mojo::kMessageExpectsResponse) &&
195 (header.flags & mojo::kMessageIsResponse)) {
196 LOG(ERROR) << "Invalid flags combination in request message.";
197 LogInvalidMessage(header);
198 return false;
199 }
200 return true;
201 }
202
203 void LogMessageInfo(void* data, const GURL& url) {
204 mojo::MojoMessageData* message_data =
205 reinterpret_cast<mojo::MojoMessageData*>(data);
206 if (IsValidMessage(message_data->header)) {
207 control_loop_proxy_->PostTask(
208 FROM_HERE,
209 base::Bind(&spy::WebSocketServer::LogMessageInfo,
210 base::Unretained(ws_server),
211 message_data->header, url, base::Time::Now()));
212 }
213 }
214
215 MojoResult last_result_;
216 uint32_t bytes_transfered_;
217 uint32_t message_count_[2];
218 uint32_t handle_count_[2];
219 scoped_refptr<base::MessageLoopProxy> control_loop_proxy_;
220 // This flag helps us intercept the first message pipe exchanged between
221 // the client and the service vendor.
222 bool service_vendor_message_pipe_received_;
124 }; 223 };
125 224
126 // In charge of intercepting access to the service manager. 225 // In charge of intercepting access to the service manager.
127 class SpyInterceptor : public mojo::ServiceManager::Interceptor { 226 class SpyInterceptor : public mojo::ServiceManager::Interceptor {
227 public:
228 SpyInterceptor(base::MessageLoopProxy* control_loop_proxy)
229 : control_loop_proxy_(control_loop_proxy) {}
230
128 private: 231 private:
129 virtual mojo::ServiceProviderPtr OnConnectToClient( 232 virtual mojo::ServiceProviderPtr OnConnectToClient(
130 const GURL& url, mojo::ServiceProviderPtr real_client) OVERRIDE { 233 const GURL& url, mojo::ServiceProviderPtr real_client) OVERRIDE {
131 if (!MustIntercept(url)) 234 if (!MustIntercept(url))
132 return real_client.Pass(); 235 return real_client.Pass();
133 236
134 // You can get an invalid handle if the app (or service) is 237 // You can get an invalid handle if the app (or service) is
135 // created by unconventional means, for example the command line. 238 // created by unconventional means, for example the command line.
136 if (!real_client.get()) 239 if (!real_client.get())
137 return real_client.Pass(); 240 return real_client.Pass();
138 241
139 mojo::ScopedMessagePipeHandle faux_client; 242 mojo::ScopedMessagePipeHandle faux_client;
140 mojo::ScopedMessagePipeHandle interceptor; 243 mojo::ScopedMessagePipeHandle interceptor;
141 CreateMessagePipe(NULL, &faux_client, &interceptor); 244 CreateMessagePipe(NULL, &faux_client, &interceptor);
142 245
143 scoped_refptr<MessageProcessor> processor = new MessageProcessor(); 246 scoped_refptr<MessageProcessor> processor = new MessageProcessor(
247 control_loop_proxy_);
144 mojo::ScopedMessagePipeHandle real_handle = real_client.PassMessagePipe(); 248 mojo::ScopedMessagePipeHandle real_handle = real_client.PassMessagePipe();
145 base::WorkerPool::PostTask( 249 base::WorkerPool::PostTask(
146 FROM_HERE, 250 FROM_HERE,
147 base::Bind(&MessageProcessor::Start, 251 base::Bind(&MessageProcessor::Start,
148 processor, 252 processor,
149 base::Passed(&real_handle), base::Passed(&interceptor)), 253 base::Passed(&real_handle), base::Passed(&interceptor),
254 url),
150 true); 255 true);
151 256
152 mojo::ServiceProviderPtr faux_provider; 257 mojo::ServiceProviderPtr faux_provider;
153 faux_provider.Bind(faux_client.Pass()); 258 faux_provider.Bind(faux_client.Pass());
154 return faux_provider.Pass(); 259 return faux_provider.Pass();
155 } 260 }
156 261
157 bool MustIntercept(const GURL& url) { 262 bool MustIntercept(const GURL& url) {
158 // TODO(cpu): manage who and when to intercept. 263 // TODO(cpu): manage who and when to intercept.
159 return true; 264 return true;
160 } 265 }
266
267 scoped_refptr<base::MessageLoopProxy> control_loop_proxy_;
161 }; 268 };
162 269
163 spy::WebSocketServer* ws_server = NULL;
164
165 void StartServer(int port) { 270 void StartServer(int port) {
166 // TODO(cpu) figure out lifetime of the server. See Spy() dtor. 271 // TODO(cpu) figure out lifetime of the server. See Spy() dtor.
167 ws_server = new spy::WebSocketServer(port); 272 ws_server = new spy::WebSocketServer(port);
168 ws_server->Start(); 273 ws_server->Start();
169 } 274 }
170 275
171 struct SpyOptions { 276 struct SpyOptions {
172 int websocket_port; 277 int websocket_port;
173 278
174 SpyOptions() 279 SpyOptions()
(...skipping 25 matching lines...) Expand all
200 Spy::Spy(mojo::ServiceManager* service_manager, const std::string& options) { 305 Spy::Spy(mojo::ServiceManager* service_manager, const std::string& options) {
201 SpyOptions spy_options = ProcessOptions(options); 306 SpyOptions spy_options = ProcessOptions(options);
202 // Start the tread what will accept commands from the frontend. 307 // Start the tread what will accept commands from the frontend.
203 control_thread_.reset(new base::Thread("mojo_spy_control_thread")); 308 control_thread_.reset(new base::Thread("mojo_spy_control_thread"));
204 base::Thread::Options thread_options(base::MessageLoop::TYPE_IO, 0); 309 base::Thread::Options thread_options(base::MessageLoop::TYPE_IO, 0);
205 control_thread_->StartWithOptions(thread_options); 310 control_thread_->StartWithOptions(thread_options);
206 control_thread_->message_loop_proxy()->PostTask( 311 control_thread_->message_loop_proxy()->PostTask(
207 FROM_HERE, base::Bind(&StartServer, spy_options.websocket_port)); 312 FROM_HERE, base::Bind(&StartServer, spy_options.websocket_port));
208 313
209 // Start intercepting mojo services. 314 // Start intercepting mojo services.
210 service_manager->SetInterceptor(new SpyInterceptor()); 315 service_manager->SetInterceptor(new SpyInterceptor(
316 control_thread_->message_loop_proxy()));
211 } 317 }
212 318
213 Spy::~Spy(){ 319 Spy::~Spy(){
214 // TODO(cpu): Do not leak the interceptor. Lifetime between the 320 // TODO(cpu): Do not leak the interceptor. Lifetime between the
215 // service_manager and the spy is still unclear hence the leak. 321 // service_manager and the spy is still unclear hence the leak.
216 } 322 }
217 323
218 } // namespace mojo 324 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/spy/common.h ('k') | mojo/spy/websocket_server.h » ('j') | mojo/spy/websocket_server.cc » ('J')

Powered by Google App Engine
This is Rietveld 408576698