| Index: content/child/websocket_bridge.cc
|
| diff --git a/content/child/websocket_bridge.cc b/content/child/websocket_bridge.cc
|
| index 7cfff1ad50a942960024e79ac8d98377e363642d..6a37e05702743e57455bda21c71a5479fa67f2eb 100644
|
| --- a/content/child/websocket_bridge.cc
|
| +++ b/content/child/websocket_bridge.cc
|
| @@ -10,11 +10,13 @@
|
| #include <vector>
|
|
|
| #include "base/logging.h"
|
| +#include "base/macros.h"
|
| #include "base/strings/string_util.h"
|
| #include "content/child/child_thread_impl.h"
|
| #include "content/child/websocket_dispatcher.h"
|
| #include "content/common/websocket.h"
|
| #include "content/common/websocket_messages.h"
|
| +#include "mojo/message_pump/handle_watcher.h"
|
| #include "ipc/ipc_message.h"
|
| #include "ipc/ipc_message_macros.h"
|
| #include "third_party/WebKit/public/platform/WebSecurityOrigin.h"
|
| @@ -75,6 +77,13 @@ bool WebSocketBridge::OnMessageReceived(const IPC::Message& msg) {
|
| IPC_MESSAGE_HANDLER(WebSocketMsg_DropChannel, DidClose)
|
| IPC_MESSAGE_HANDLER(WebSocketMsg_NotifyClosing,
|
| DidStartClosingHandshake)
|
| + IPC_MESSAGE_HANDLER(WebSocketMsg_LoaderTransferTest_SetDataBuffer,
|
| + OnLoaderTransferTest_SetDataBuffer)
|
| + IPC_MESSAGE_HANDLER(WebSocketMsg_LoaderTransferTest_Ack,
|
| + OnLoaderTransferTest_ReceivedAck)
|
| + IPC_MESSAGE_HANDLER(WebSocketMsg_LoaderTransferTest_Done,
|
| + OnLoaderTransferTest_Done)
|
| +
|
| IPC_MESSAGE_UNHANDLED(handled = false)
|
| IPC_END_MESSAGE_MAP()
|
| return handled;
|
| @@ -290,4 +299,223 @@ void WebSocketBridge::Disconnect() {
|
| client_ = NULL;
|
| }
|
|
|
| +class WebSocketBridge::LoaderTestJob {
|
| + public:
|
| + LoaderTestJob(WebSocketBridge::LoaderTestIPC ipc,
|
| + scoped_ptr<blink::WebCallbacks<int, void>> callbacks,
|
| + size_t bucket_size,
|
| + size_t buffer_size,
|
| + size_t total_size,
|
| + WebSocketBridge* bridge)
|
| + : switch_(ipc),
|
| + callbacks_(std::move(callbacks)),
|
| + bucket_data_(std::vector<char>(bucket_size, 'a')),
|
| + written_size_(0),
|
| + buffer_size_(buffer_size),
|
| + total_size_(total_size),
|
| + bridge_(bridge) {}
|
| +
|
| + void StartWatching() {
|
| + handle_watcher_.Start(
|
| + writer_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE,
|
| + base::Bind(&LoaderTestJob::OnWritable, base::Unretained(this)));
|
| + }
|
| +
|
| + void Start(LoaderTestService* service) {
|
| + start_time_ = base::Time::Now();
|
| + if (switch_ == WebSocketBridge::ViaChromiumIPC) {
|
| + StartChromiumIPCLoading();
|
| + } else {
|
| + StartMojoLoading(service);
|
| + }
|
| + }
|
| +
|
| + void StartChromiumIPCLoading() {
|
| + ChildThreadImpl::current()->Send(
|
| + new WebSocketHostMsg_LoaderTransferTest_Setup(bridge_->channel_id_,
|
| + buffer_size_));
|
| + }
|
| +
|
| + void OnSetDataBuffer(base::SharedMemoryHandle shm_handle) {
|
| + CHECK(base::SharedMemory::IsHandleValid(shm_handle));
|
| +
|
| + shm_buffer_.reset(new base::SharedMemory(shm_handle, false));
|
| +
|
| + bool ok = shm_buffer_->Map(buffer_size_);
|
| + CHECK(ok);
|
| +
|
| + while (Send()) {
|
| + }
|
| + }
|
| +
|
| + bool Send() {
|
| + size_t offset = 0;
|
| + size_t to_be_written = buffer_size_;
|
| + if (!in_flight_requets_.empty()) {
|
| + const auto& front = in_flight_requets_.front();
|
| + const auto& back = in_flight_requets_.back();
|
| + if (back.first + back.second != buffer_size_) {
|
| + offset = back.first + back.second;
|
| + to_be_written = std::min(to_be_written, buffer_size_ - offset);
|
| + }
|
| + if (offset <= front.first) {
|
| + to_be_written = std::min(to_be_written, front.first - offset);
|
| + }
|
| + }
|
| + size_t head = written_size_ % bucket_data_.size();
|
| + to_be_written =
|
| + std::min(to_be_written,
|
| + std::min(bucket_data_.size() - head,
|
| + total_size_ - written_size_));
|
| + if (to_be_written == 0)
|
| + return false;
|
| +
|
| + std::copy(bucket_data_.begin() + head,
|
| + bucket_data_.begin() + head + to_be_written,
|
| + static_cast<char*>(shm_buffer_->memory()) + offset);
|
| + written_size_ += to_be_written;
|
| + ChildThreadImpl::current()->Send(
|
| + new WebSocketHostMsg_LoaderTransferTest_Send(
|
| + bridge_->channel_id_, offset, to_be_written));
|
| +
|
| + in_flight_requets_.push_back(std::make_pair(offset, to_be_written));
|
| + if (written_size_ == total_size_) {
|
| + ChildThreadImpl::current()->Send(
|
| + new WebSocketHostMsg_LoaderTransferTest_Close(
|
| + bridge_->channel_id_));
|
| + }
|
| + return true;
|
| + }
|
| +
|
| + void OnReceivedAck() {
|
| + // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__);
|
| + in_flight_requets_.pop_front();
|
| + while (Send()) {
|
| + }
|
| + }
|
| +
|
| + void OnReceivedDone() {
|
| + CHECK(in_flight_requets_.empty());
|
| + callbacks_->onSuccess(
|
| + static_cast<int>((base::Time::Now() - start_time_).InMicroseconds()));
|
| + bridge_->loader_test_job_ = nullptr;
|
| + // |this| is deleted here.
|
| + }
|
| +
|
| + void StartMojoLoading(LoaderTestService* service) {
|
| + // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__);
|
| + MojoCreateDataPipeOptions options;
|
| + options.struct_size = sizeof(MojoCreateDataPipeOptions);
|
| + options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE;
|
| + options.element_num_bytes = 1;
|
| + options.capacity_num_bytes = buffer_size_;
|
| + mojo::DataPipe data_pipe(options);
|
| + mojo::ScopedDataPipeConsumerHandle handle;
|
| +
|
| + writer_ = std::move(data_pipe.producer_handle);
|
| + service->Transmit(
|
| + std::move(data_pipe.consumer_handle),
|
| + base::Bind(&LoaderTestJob::OnSuccess, base::Unretained(this)));
|
| + StartWatching();
|
| + }
|
| +
|
| + void OnWritable(MojoResult result) {
|
| + // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__);
|
| + while (true) {
|
| + void* buffer = nullptr;
|
| + size_t head = written_size_ % bucket_data_.size();
|
| + uint32_t available = 0;
|
| + MojoResult result = mojo::BeginWriteDataRaw(
|
| + writer_.get(), &buffer, &available, MOJO_WRITE_DATA_FLAG_NONE);
|
| + size_t to_be_written = std::min(
|
| + static_cast<size_t>(available),
|
| + std::min(bucket_data_.size() - head, total_size_ - written_size_));
|
| +
|
| + if (result == MOJO_RESULT_OK) {
|
| + std::copy(bucket_data_.begin() + head,
|
| + bucket_data_.begin() + head + to_be_written,
|
| + static_cast<char*>(buffer));
|
| + // fprintf(stderr, "Wrote %zu bytes\n",
|
| + // static_cast<size_t>(to_be_written));
|
| + mojo::EndWriteDataRaw(writer_.get(), to_be_written);
|
| + written_size_ += to_be_written;
|
| +
|
| + if (written_size_ == total_size_) {
|
| + // fprintf(stderr, "wrote %zu bytes in total: close\n",
|
| + // written_size_);
|
| + writer_.reset();
|
| + return;
|
| + }
|
| + } else if (result == MOJO_RESULT_SHOULD_WAIT) {
|
| + StartWatching();
|
| + break;
|
| + } else {
|
| + callbacks_->onError();
|
| + bridge_->loader_test_job_ = nullptr;
|
| + // |this| is deleted here.
|
| + return;
|
| + }
|
| + }
|
| + }
|
| +
|
| + void OnSuccess() {
|
| + callbacks_->onSuccess(
|
| + static_cast<int>((base::Time::Now() - start_time_).InMicroseconds()));
|
| + bridge_->loader_test_job_ = nullptr;
|
| + // |this| is deleted here.
|
| + }
|
| +
|
| + private:
|
| + WebSocketBridge::LoaderTestIPC switch_;
|
| + scoped_ptr<blink::WebCallbacks<int, void>> callbacks_;
|
| + std::vector<char> bucket_data_;
|
| + size_t written_size_;
|
| + size_t buffer_size_;
|
| + size_t total_size_;
|
| + WebSocketBridge* bridge_;
|
| +
|
| + // mojo-related
|
| + mojo::ScopedDataPipeProducerHandle writer_;
|
| + mojo::common::HandleWatcher handle_watcher_;
|
| +
|
| + //chromium-ipc-related
|
| + scoped_ptr<base::SharedMemory> shm_buffer_;
|
| + std::deque<std::pair<size_t, size_t>> in_flight_requets_; // <offset, size>
|
| +
|
| + base::Time start_time_;
|
| +};
|
| +
|
| +void WebSocketBridge::loaderTestTransmit(LoaderTestIPC ipc, bool verify_data, size_t bucket_size, size_t buffer_size, size_t total_size, blink::WebCallbacks<int, void>* raw_callbacks) {
|
| + scoped_ptr<blink::WebCallbacks<int, void>> callbacks(raw_callbacks);
|
| +
|
| + if (loader_test_job_) {
|
| + fprintf(stderr, "job exists\n");
|
| + callbacks->onError();
|
| + return;
|
| + }
|
| +
|
| + WebSocketDispatcher* dispatcher =
|
| + ChildThreadImpl::current()->websocket_dispatcher();
|
| + loader_test_job_.reset(new LoaderTestJob(
|
| + ipc, std::move(callbacks), bucket_size, buffer_size, total_size, this));
|
| + loader_test_job_->Start(dispatcher->loader_test_service());
|
| +}
|
| +
|
| +
|
| +void WebSocketBridge::OnLoaderTransferTest_SetDataBuffer(
|
| + base::SharedMemoryHandle shm_handle) {
|
| + if (loader_test_job_)
|
| + loader_test_job_->OnSetDataBuffer(shm_handle);
|
| +}
|
| +
|
| +void WebSocketBridge::OnLoaderTransferTest_ReceivedAck() {
|
| + if (loader_test_job_)
|
| + loader_test_job_->OnReceivedAck();
|
| +}
|
| +
|
| +void WebSocketBridge::OnLoaderTransferTest_Done() {
|
| + if (loader_test_job_)
|
| + loader_test_job_->OnReceivedDone();
|
| +}
|
| +
|
| } // namespace content
|
|
|