| Index: mojo/spy/spy.cc
|
| diff --git a/mojo/spy/spy.cc b/mojo/spy/spy.cc
|
| index 38310ec040d69d232c379601de042081ddf8ea40..82844264c09bdc29e916c6bb0ee579b497a136aa 100644
|
| --- a/mojo/spy/spy.cc
|
| +++ b/mojo/spy/spy.cc
|
| @@ -7,7 +7,9 @@
|
| #include "base/bind.h"
|
| #include "base/compiler_specific.h"
|
| #include "base/location.h"
|
| +#include "base/logging.h"
|
| #include "base/memory/ref_counted.h"
|
| +#include "base/time/time.h"
|
| #include "base/strings/string_number_conversions.h"
|
| #include "base/strings/string_split.h"
|
| #include "base/threading/thread.h"
|
| @@ -15,10 +17,13 @@
|
|
|
| #include "mojo/public/cpp/system/core.h"
|
| #include "mojo/service_manager/service_manager.h"
|
| +#include "mojo/spy/common.h"
|
| #include "mojo/spy/websocket_server.h"
|
|
|
| namespace {
|
|
|
| +spy::WebSocketServer* ws_server = NULL;
|
| +
|
| const size_t kMessageBufSize = 2 * 1024;
|
| const size_t kHandleBufSize = 64;
|
| const int kDefaultWebSocketPort = 42424;
|
| @@ -34,9 +39,11 @@ class MessageProcessor :
|
| public base::RefCountedThreadSafe<MessageProcessor> {
|
| public:
|
|
|
| - MessageProcessor()
|
| + MessageProcessor(base::MessageLoopProxy* control_loop_proxy)
|
| : last_result_(MOJO_RESULT_OK),
|
| - bytes_transfered_(0) {
|
| + bytes_transfered_(0),
|
| + control_loop_proxy_(control_loop_proxy),
|
| + service_vendor_message_pipe_received_(false) {
|
|
|
| message_count_[0] = 0;
|
| message_count_[1] = 0;
|
| @@ -45,7 +52,8 @@ class MessageProcessor :
|
| }
|
|
|
| void Start(mojo::ScopedMessagePipeHandle client,
|
| - mojo::ScopedMessagePipeHandle interceptor) {
|
| + mojo::ScopedMessagePipeHandle interceptor,
|
| + const GURL& url) {
|
| std::vector<mojo::MessagePipeHandle> pipes;
|
| pipes.push_back(client.get());
|
| pipes.push_back(interceptor.get());
|
| @@ -82,12 +90,42 @@ class MessageProcessor :
|
| if (!bytes_read && !handles_read)
|
| continue;
|
|
|
| - if (handles_read)
|
| + if (handles_read) {
|
| handle_count_[r] += handles_read;
|
|
|
| + // Intercept the first set of handles to message pipes with the
|
| + // assumption that these would be used for vending mojo services.
|
| + // TODO(ananta)
|
| + // The above approach is hacky and could cause us to miss other message
|
| + // pipes which could be exchanged between the client and the server.
|
| + // Look into a cleaner way of identifying message pipe handles.
|
| + if (!service_vendor_message_pipe_received_) {
|
| + service_vendor_message_pipe_received_ = true;
|
| + for (uint32_t i = 0; i < handles_read; i++) {
|
| + mojo::ScopedMessagePipeHandle message_pipe_handle;
|
| + message_pipe_handle.reset(mojo::MessagePipeHandle(hbuf[i]));
|
| +
|
| + mojo::ScopedMessagePipeHandle faux_client;
|
| + mojo::ScopedMessagePipeHandle interceptor;
|
| + CreateMessagePipe(NULL, &faux_client, &interceptor);
|
| +
|
| + base::WorkerPool::PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&MessageProcessor::Start,
|
| + this,
|
| + base::Passed(&message_pipe_handle),
|
| + base::Passed(&interceptor),
|
| + url),
|
| + true);
|
| + hbuf.get()[i] = faux_client.release().value();
|
| + }
|
| + }
|
| + }
|
| ++message_count_[r];
|
| bytes_transfered_ += bytes_read;
|
|
|
| + LogMessageInfo(mbuf.get(), url);
|
| +
|
| mojo::MessagePipeHandle write_handle = (r == 0) ? pipes[1] : pipes[0];
|
| if (!CheckResult(Wait(write_handle,
|
| MOJO_HANDLE_SIGNAL_WRITABLE,
|
| @@ -107,24 +145,89 @@ class MessageProcessor :
|
| }
|
|
|
| private:
|
| - friend class base::RefCountedThreadSafe<MessageProcessor>;
|
| - virtual ~MessageProcessor() {}
|
| -
|
| - bool CheckResult(MojoResult mr) {
|
| - if (mr == MOJO_RESULT_OK)
|
| - return true;
|
| - last_result_ = mr;
|
| - return false;
|
| - }
|
| -
|
| - MojoResult last_result_;
|
| - uint32_t bytes_transfered_;
|
| - uint32_t message_count_[2];
|
| - uint32_t handle_count_[2];
|
| + friend class base::RefCountedThreadSafe<MessageProcessor>;
|
| + virtual ~MessageProcessor() {}
|
| +
|
| + bool CheckResult(MojoResult mr) {
|
| + if (mr == MOJO_RESULT_OK)
|
| + return true;
|
| + last_result_ = mr;
|
| + return false;
|
| + }
|
| +
|
| + void LogInvalidMessage(const mojo::MojoMessageHeader& header) {
|
| + LOG(ERROR) << "Invalid message: Number of Fields: "
|
| + << header.num_fields
|
| + << " Number of bytes: "
|
| + << header.num_bytes
|
| + << " Flags: "
|
| + << header.flags;
|
| + }
|
| +
|
| + // Validates the message as per the mojo spec.
|
| + bool IsValidMessage(const mojo::MojoMessageHeader& header) {
|
| + if (header.num_fields == 2) {
|
| + if (header.num_bytes != sizeof(mojo::MojoMessageHeader)) {
|
| + LogInvalidMessage(header);
|
| + return false;
|
| + }
|
| + } else if (header.num_fields == 3) {
|
| + if (header.num_bytes != sizeof(mojo::MojoRequestHeader)) {
|
| + LogInvalidMessage(header);
|
| + }
|
| + } else if (header.num_fields > 3) {
|
| + if (header.num_bytes < sizeof(mojo::MojoRequestHeader)) {
|
| + LogInvalidMessage(header);
|
| + return false;
|
| + }
|
| + }
|
| +
|
| + // These flags should be specified in request or response messages.
|
| + if (header.num_fields < 3 &&
|
| + ((header.flags & mojo::kMessageExpectsResponse) ||
|
| + (header.flags & mojo::kMessageIsResponse))) {
|
| + LOG(ERROR) << "Invalid request message.";
|
| + LogInvalidMessage(header);
|
| + return false;
|
| + }
|
| + // These flags are mutually exclusive.
|
| + if ((header.flags & mojo::kMessageExpectsResponse) &&
|
| + (header.flags & mojo::kMessageIsResponse)) {
|
| + LOG(ERROR) << "Invalid flags combination in request message.";
|
| + LogInvalidMessage(header);
|
| + return false;
|
| + }
|
| + return true;
|
| + }
|
| +
|
| + void LogMessageInfo(void* data, const GURL& url) {
|
| + mojo::MojoMessageData* message_data =
|
| + reinterpret_cast<mojo::MojoMessageData*>(data);
|
| + if (IsValidMessage(message_data->header)) {
|
| + control_loop_proxy_->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&spy::WebSocketServer::LogMessageInfo,
|
| + base::Unretained(ws_server),
|
| + message_data->header, url, base::Time::Now()));
|
| + }
|
| + }
|
| +
|
| + MojoResult last_result_;
|
| + uint32_t bytes_transfered_;
|
| + uint32_t message_count_[2];
|
| + uint32_t handle_count_[2];
|
| + scoped_refptr<base::MessageLoopProxy> control_loop_proxy_;
|
| + // This flag helps us intercept the first message pipe exchanged between
|
| + // the client and the service vendor.
|
| + bool service_vendor_message_pipe_received_;
|
| };
|
|
|
| // In charge of intercepting access to the service manager.
|
| class SpyInterceptor : public mojo::ServiceManager::Interceptor {
|
| + public:
|
| + SpyInterceptor(base::MessageLoopProxy* control_loop_proxy)
|
| + : control_loop_proxy_(control_loop_proxy) {}
|
| +
|
| private:
|
| virtual mojo::ServiceProviderPtr OnConnectToClient(
|
| const GURL& url, mojo::ServiceProviderPtr real_client) OVERRIDE {
|
| @@ -140,13 +243,15 @@ class SpyInterceptor : public mojo::ServiceManager::Interceptor {
|
| mojo::ScopedMessagePipeHandle interceptor;
|
| CreateMessagePipe(NULL, &faux_client, &interceptor);
|
|
|
| - scoped_refptr<MessageProcessor> processor = new MessageProcessor();
|
| + scoped_refptr<MessageProcessor> processor = new MessageProcessor(
|
| + control_loop_proxy_);
|
| mojo::ScopedMessagePipeHandle real_handle = real_client.PassMessagePipe();
|
| base::WorkerPool::PostTask(
|
| FROM_HERE,
|
| base::Bind(&MessageProcessor::Start,
|
| processor,
|
| - base::Passed(&real_handle), base::Passed(&interceptor)),
|
| + base::Passed(&real_handle), base::Passed(&interceptor),
|
| + url),
|
| true);
|
|
|
| mojo::ServiceProviderPtr faux_provider;
|
| @@ -158,9 +263,9 @@ class SpyInterceptor : public mojo::ServiceManager::Interceptor {
|
| // TODO(cpu): manage who and when to intercept.
|
| return true;
|
| }
|
| -};
|
|
|
| -spy::WebSocketServer* ws_server = NULL;
|
| + scoped_refptr<base::MessageLoopProxy> control_loop_proxy_;
|
| +};
|
|
|
| void StartServer(int port) {
|
| // TODO(cpu) figure out lifetime of the server. See Spy() dtor.
|
| @@ -207,7 +312,8 @@ Spy::Spy(mojo::ServiceManager* service_manager, const std::string& options) {
|
| FROM_HERE, base::Bind(&StartServer, spy_options.websocket_port));
|
|
|
| // Start intercepting mojo services.
|
| - service_manager->SetInterceptor(new SpyInterceptor());
|
| + service_manager->SetInterceptor(new SpyInterceptor(
|
| + control_thread_->message_loop_proxy()));
|
| }
|
|
|
| Spy::~Spy(){
|
|
|