Index: mojo/spy/spy.cc |
diff --git a/mojo/spy/spy.cc b/mojo/spy/spy.cc |
index fbc8120e6a766945b92639e9ea266ea222b86869..6c804690764bba0c252e9fcd35a7825f837e2304 100644 |
--- a/mojo/spy/spy.cc |
+++ b/mojo/spy/spy.cc |
@@ -9,15 +9,17 @@ |
#include "base/bind.h" |
#include "base/compiler_specific.h" |
#include "base/location.h" |
+#include "base/logging.h" |
#include "base/memory/ref_counted.h" |
#include "base/message_loop/message_loop_proxy.h" |
#include "base/strings/string_number_conversions.h" |
#include "base/strings/string_split.h" |
#include "base/threading/thread.h" |
#include "base/threading/worker_pool.h" |
- |
+#include "base/time/time.h" |
#include "mojo/public/cpp/system/core.h" |
#include "mojo/service_manager/service_manager.h" |
+#include "mojo/spy/common.h" |
#include "mojo/spy/public/spy.mojom.h" |
#include "mojo/spy/spy_server_impl.h" |
#include "mojo/spy/websocket_server.h" |
@@ -25,6 +27,8 @@ |
namespace { |
+mojo::WebSocketServer* ws_server = NULL; |
+ |
const size_t kMessageBufSize = 2 * 1024; |
const size_t kHandleBufSize = 64; |
const int kDefaultWebSocketPort = 42424; |
@@ -39,9 +43,11 @@ void CloseHandles(MojoHandle* handles, size_t count) { |
class MessageProcessor : |
public base::RefCountedThreadSafe<MessageProcessor> { |
public: |
- MessageProcessor() |
+ MessageProcessor(base::MessageLoopProxy* control_loop_proxy) |
: last_result_(MOJO_RESULT_OK), |
- bytes_transfered_(0) { |
+ bytes_transfered_(0), |
+ control_loop_proxy_(control_loop_proxy), |
+ service_vendor_message_pipe_received_(false) { |
message_count_[0] = 0; |
message_count_[1] = 0; |
handle_count_[0] = 0; |
@@ -49,7 +55,8 @@ class MessageProcessor : |
} |
void Start(mojo::ScopedMessagePipeHandle client, |
- mojo::ScopedMessagePipeHandle interceptor) { |
+ mojo::ScopedMessagePipeHandle interceptor, |
+ const GURL& url) { |
std::vector<mojo::MessagePipeHandle> pipes; |
pipes.push_back(client.get()); |
pipes.push_back(interceptor.get()); |
@@ -86,12 +93,42 @@ class MessageProcessor : |
if (!bytes_read && !handles_read) |
continue; |
- if (handles_read) |
+ if (handles_read) { |
handle_count_[r] += handles_read; |
+ // Intercept the first set of handles to message pipes with the |
+ // assumption that these would be used for vending mojo services. |
+ // TODO(ananta) |
+ // The above approach is hacky and could cause us to miss other message |
+ // pipes which could be exchanged between the client and the server. |
+ // Look into a cleaner way of identifying message pipe handles. |
+ if (!service_vendor_message_pipe_received_) { |
+ service_vendor_message_pipe_received_ = true; |
+ for (uint32_t i = 0; i < handles_read; i++) { |
+ mojo::ScopedMessagePipeHandle message_pipe_handle; |
+ message_pipe_handle.reset(mojo::MessagePipeHandle(hbuf[i])); |
+ |
+ mojo::ScopedMessagePipeHandle faux_client; |
+ mojo::ScopedMessagePipeHandle interceptor; |
+ CreateMessagePipe(NULL, &faux_client, &interceptor); |
+ |
+ base::WorkerPool::PostTask( |
+ FROM_HERE, |
+ base::Bind(&MessageProcessor::Start, |
+ this, |
+ base::Passed(&message_pipe_handle), |
+ base::Passed(&interceptor), |
+ url), |
+ true); |
+ hbuf.get()[i] = faux_client.release().value(); |
+ } |
+ } |
+ } |
++message_count_[r]; |
bytes_transfered_ += bytes_read; |
+ LogMessageInfo(mbuf.get(), url); |
+ |
mojo::MessagePipeHandle write_handle = (r == 0) ? pipes[1] : pipes[0]; |
if (!CheckResult(Wait(write_handle, |
MOJO_HANDLE_SIGNAL_WRITABLE, |
@@ -121,18 +158,80 @@ class MessageProcessor : |
return false; |
} |
+ void LogInvalidMessage(const mojo::MojoMessageHeader& header) { |
+ LOG(ERROR) << "Invalid message: Number of Fields: " |
+ << header.num_fields |
+ << " Number of bytes: " |
+ << header.num_bytes |
+ << " Flags: " |
+ << header.flags; |
+ } |
+ |
+ // Validates the message as per the mojo spec. |
+ bool IsValidMessage(const mojo::MojoMessageHeader& header) { |
+ if (header.num_fields == 2) { |
+ if (header.num_bytes != sizeof(mojo::MojoMessageHeader)) { |
+ LogInvalidMessage(header); |
+ return false; |
+ } |
+ } else if (header.num_fields == 3) { |
+ if (header.num_bytes != sizeof(mojo::MojoRequestHeader)) { |
+ LogInvalidMessage(header); |
+ } |
+ } else if (header.num_fields > 3) { |
+ if (header.num_bytes < sizeof(mojo::MojoRequestHeader)) { |
+ LogInvalidMessage(header); |
+ return false; |
+ } |
+ } |
+ // These flags should be specified in request or response messages. |
+ if (header.num_fields < 3 && |
+ ((header.flags & mojo::kMessageExpectsResponse) || |
+ (header.flags & mojo::kMessageIsResponse))) { |
+ LOG(ERROR) << "Invalid request message."; |
+ LogInvalidMessage(header); |
+ return false; |
+ } |
+ // These flags are mutually exclusive. |
+ if ((header.flags & mojo::kMessageExpectsResponse) && |
+ (header.flags & mojo::kMessageIsResponse)) { |
+ LOG(ERROR) << "Invalid flags combination in request message."; |
+ LogInvalidMessage(header); |
+ return false; |
+ } |
+ return true; |
+ } |
+ |
+ void LogMessageInfo(void* data, const GURL& url) { |
+ mojo::MojoMessageData* message_data = |
+ reinterpret_cast<mojo::MojoMessageData*>(data); |
+ if (IsValidMessage(message_data->header)) { |
+ control_loop_proxy_->PostTask( |
+ FROM_HERE, |
+ base::Bind(&mojo::WebSocketServer::LogMessageInfo, |
+ base::Unretained(ws_server), |
+ message_data->header, url, base::Time::Now())); |
+ } |
+ } |
+ |
MojoResult last_result_; |
uint32_t bytes_transfered_; |
uint32_t message_count_[2]; |
uint32_t handle_count_[2]; |
+ scoped_refptr<base::MessageLoopProxy> control_loop_proxy_; |
+ // This flag helps us intercept the first message pipe exchanged between |
+ // the client and the service vendor. |
+ bool service_vendor_message_pipe_received_; |
}; |
// In charge of intercepting access to the service manager. |
class SpyInterceptor : public mojo::ServiceManager::Interceptor { |
public: |
- explicit SpyInterceptor(scoped_refptr<mojo::SpyServerImpl> spy_server) |
+ explicit SpyInterceptor(scoped_refptr<mojo::SpyServerImpl> spy_server, |
+ base::MessageLoopProxy* control_loop_proxy) |
: spy_server_(spy_server), |
- proxy_(base::MessageLoopProxy::current()) { |
+ proxy_(base::MessageLoopProxy::current()), |
+ control_loop_proxy_(control_loop_proxy){ |
} |
private: |
@@ -150,13 +249,15 @@ class SpyInterceptor : public mojo::ServiceManager::Interceptor { |
mojo::ScopedMessagePipeHandle interceptor; |
CreateMessagePipe(NULL, &faux_client, &interceptor); |
- scoped_refptr<MessageProcessor> processor = new MessageProcessor(); |
+ scoped_refptr<MessageProcessor> processor = new MessageProcessor( |
+ control_loop_proxy_); |
mojo::ScopedMessagePipeHandle real_handle = real_client.PassMessagePipe(); |
base::WorkerPool::PostTask( |
FROM_HERE, |
base::Bind(&MessageProcessor::Start, |
processor, |
- base::Passed(&real_handle), base::Passed(&interceptor)), |
+ base::Passed(&real_handle), base::Passed(&interceptor), |
+ url), |
true); |
mojo::ServiceProviderPtr faux_provider; |
@@ -174,10 +275,9 @@ class SpyInterceptor : public mojo::ServiceManager::Interceptor { |
scoped_refptr<mojo::SpyServerImpl> spy_server_; |
scoped_refptr<base::MessageLoopProxy> proxy_; |
+ scoped_refptr<base::MessageLoopProxy> control_loop_proxy_; |
}; |
-mojo::WebSocketServer* ws_server = NULL; |
- |
void StartWebServer(int port, mojo::ScopedMessagePipeHandle pipe) { |
// TODO(cpu) figure out lifetime of the server. See Spy() dtor. |
ws_server = new mojo::WebSocketServer(port, pipe.Pass()); |
@@ -228,7 +328,8 @@ Spy::Spy(mojo::ServiceManager* service_manager, const std::string& options) { |
base::Passed(spy_server_->ServerPipe()))); |
// Start intercepting mojo services. |
- service_manager->SetInterceptor(new SpyInterceptor(spy_server_)); |
+ service_manager->SetInterceptor(new SpyInterceptor( |
+ spy_server_, control_thread_->message_loop_proxy())); |
} |
Spy::~Spy() { |