Index: content/child/websocket_bridge.cc |
diff --git a/content/child/websocket_bridge.cc b/content/child/websocket_bridge.cc |
index 7cfff1ad50a942960024e79ac8d98377e363642d..6a37e05702743e57455bda21c71a5479fa67f2eb 100644 |
--- a/content/child/websocket_bridge.cc |
+++ b/content/child/websocket_bridge.cc |
@@ -10,11 +10,13 @@ |
#include <vector> |
#include "base/logging.h" |
+#include "base/macros.h" |
#include "base/strings/string_util.h" |
#include "content/child/child_thread_impl.h" |
#include "content/child/websocket_dispatcher.h" |
#include "content/common/websocket.h" |
#include "content/common/websocket_messages.h" |
+#include "mojo/message_pump/handle_watcher.h" |
#include "ipc/ipc_message.h" |
#include "ipc/ipc_message_macros.h" |
#include "third_party/WebKit/public/platform/WebSecurityOrigin.h" |
@@ -75,6 +77,13 @@ bool WebSocketBridge::OnMessageReceived(const IPC::Message& msg) { |
IPC_MESSAGE_HANDLER(WebSocketMsg_DropChannel, DidClose) |
IPC_MESSAGE_HANDLER(WebSocketMsg_NotifyClosing, |
DidStartClosingHandshake) |
+ IPC_MESSAGE_HANDLER(WebSocketMsg_LoaderTransferTest_SetDataBuffer, |
+ OnLoaderTransferTest_SetDataBuffer) |
+ IPC_MESSAGE_HANDLER(WebSocketMsg_LoaderTransferTest_Ack, |
+ OnLoaderTransferTest_ReceivedAck) |
+ IPC_MESSAGE_HANDLER(WebSocketMsg_LoaderTransferTest_Done, |
+ OnLoaderTransferTest_Done) |
+ |
IPC_MESSAGE_UNHANDLED(handled = false) |
IPC_END_MESSAGE_MAP() |
return handled; |
@@ -290,4 +299,223 @@ void WebSocketBridge::Disconnect() { |
client_ = NULL; |
} |
+class WebSocketBridge::LoaderTestJob { |
+ public: |
+ LoaderTestJob(WebSocketBridge::LoaderTestIPC ipc, |
+ scoped_ptr<blink::WebCallbacks<int, void>> callbacks, |
+ size_t bucket_size, |
+ size_t buffer_size, |
+ size_t total_size, |
+ WebSocketBridge* bridge) |
+ : switch_(ipc), |
+ callbacks_(std::move(callbacks)), |
+ bucket_data_(std::vector<char>(bucket_size, 'a')), |
+ written_size_(0), |
+ buffer_size_(buffer_size), |
+ total_size_(total_size), |
+ bridge_(bridge) {} |
+ |
+ void StartWatching() { |
+ handle_watcher_.Start( |
+ writer_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE, |
+ base::Bind(&LoaderTestJob::OnWritable, base::Unretained(this))); |
+ } |
+ |
+ void Start(LoaderTestService* service) { |
+ start_time_ = base::Time::Now(); |
+ if (switch_ == WebSocketBridge::ViaChromiumIPC) { |
+ StartChromiumIPCLoading(); |
+ } else { |
+ StartMojoLoading(service); |
+ } |
+ } |
+ |
+ void StartChromiumIPCLoading() { |
+ ChildThreadImpl::current()->Send( |
+ new WebSocketHostMsg_LoaderTransferTest_Setup(bridge_->channel_id_, |
+ buffer_size_)); |
+ } |
+ |
+ void OnSetDataBuffer(base::SharedMemoryHandle shm_handle) { |
+ CHECK(base::SharedMemory::IsHandleValid(shm_handle)); |
+ |
+ shm_buffer_.reset(new base::SharedMemory(shm_handle, false)); |
+ |
+ bool ok = shm_buffer_->Map(buffer_size_); |
+ CHECK(ok); |
+ |
+ while (Send()) { |
+ } |
+ } |
+ |
+ bool Send() { |
+ size_t offset = 0; |
+ size_t to_be_written = buffer_size_; |
+ if (!in_flight_requets_.empty()) { |
+ const auto& front = in_flight_requets_.front(); |
+ const auto& back = in_flight_requets_.back(); |
+ if (back.first + back.second != buffer_size_) { |
+ offset = back.first + back.second; |
+ to_be_written = std::min(to_be_written, buffer_size_ - offset); |
+ } |
+ if (offset <= front.first) { |
+ to_be_written = std::min(to_be_written, front.first - offset); |
+ } |
+ } |
+ size_t head = written_size_ % bucket_data_.size(); |
+ to_be_written = |
+ std::min(to_be_written, |
+ std::min(bucket_data_.size() - head, |
+ total_size_ - written_size_)); |
+ if (to_be_written == 0) |
+ return false; |
+ |
+ std::copy(bucket_data_.begin() + head, |
+ bucket_data_.begin() + head + to_be_written, |
+ static_cast<char*>(shm_buffer_->memory()) + offset); |
+ written_size_ += to_be_written; |
+ ChildThreadImpl::current()->Send( |
+ new WebSocketHostMsg_LoaderTransferTest_Send( |
+ bridge_->channel_id_, offset, to_be_written)); |
+ |
+ in_flight_requets_.push_back(std::make_pair(offset, to_be_written)); |
+ if (written_size_ == total_size_) { |
+ ChildThreadImpl::current()->Send( |
+ new WebSocketHostMsg_LoaderTransferTest_Close( |
+ bridge_->channel_id_)); |
+ } |
+ return true; |
+ } |
+ |
+ void OnReceivedAck() { |
+ // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__); |
+ in_flight_requets_.pop_front(); |
+ while (Send()) { |
+ } |
+ } |
+ |
+ void OnReceivedDone() { |
+ CHECK(in_flight_requets_.empty()); |
+ callbacks_->onSuccess( |
+ static_cast<int>((base::Time::Now() - start_time_).InMicroseconds())); |
+ bridge_->loader_test_job_ = nullptr; |
+ // |this| is deleted here. |
+ } |
+ |
+ void StartMojoLoading(LoaderTestService* service) { |
+ // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__); |
+ MojoCreateDataPipeOptions options; |
+ options.struct_size = sizeof(MojoCreateDataPipeOptions); |
+ options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; |
+ options.element_num_bytes = 1; |
+ options.capacity_num_bytes = buffer_size_; |
+ mojo::DataPipe data_pipe(options); |
+ mojo::ScopedDataPipeConsumerHandle handle; |
+ |
+ writer_ = std::move(data_pipe.producer_handle); |
+ service->Transmit( |
+ std::move(data_pipe.consumer_handle), |
+ base::Bind(&LoaderTestJob::OnSuccess, base::Unretained(this))); |
+ StartWatching(); |
+ } |
+ |
+ void OnWritable(MojoResult result) { |
+ // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__); |
+ while (true) { |
+ void* buffer = nullptr; |
+ size_t head = written_size_ % bucket_data_.size(); |
+ uint32_t available = 0; |
+ MojoResult result = mojo::BeginWriteDataRaw( |
+ writer_.get(), &buffer, &available, MOJO_WRITE_DATA_FLAG_NONE); |
+ size_t to_be_written = std::min( |
+ static_cast<size_t>(available), |
+ std::min(bucket_data_.size() - head, total_size_ - written_size_)); |
+ |
+ if (result == MOJO_RESULT_OK) { |
+ std::copy(bucket_data_.begin() + head, |
+ bucket_data_.begin() + head + to_be_written, |
+ static_cast<char*>(buffer)); |
+ // fprintf(stderr, "Wrote %zu bytes\n", |
+ // static_cast<size_t>(to_be_written)); |
+ mojo::EndWriteDataRaw(writer_.get(), to_be_written); |
+ written_size_ += to_be_written; |
+ |
+ if (written_size_ == total_size_) { |
+ // fprintf(stderr, "wrote %zu bytes in total: close\n", |
+ // written_size_); |
+ writer_.reset(); |
+ return; |
+ } |
+ } else if (result == MOJO_RESULT_SHOULD_WAIT) { |
+ StartWatching(); |
+ break; |
+ } else { |
+ callbacks_->onError(); |
+ bridge_->loader_test_job_ = nullptr; |
+ // |this| is deleted here. |
+ return; |
+ } |
+ } |
+ } |
+ |
+ void OnSuccess() { |
+ callbacks_->onSuccess( |
+ static_cast<int>((base::Time::Now() - start_time_).InMicroseconds())); |
+ bridge_->loader_test_job_ = nullptr; |
+ // |this| is deleted here. |
+ } |
+ |
+ private: |
+ WebSocketBridge::LoaderTestIPC switch_; |
+ scoped_ptr<blink::WebCallbacks<int, void>> callbacks_; |
+ std::vector<char> bucket_data_; |
+ size_t written_size_; |
+ size_t buffer_size_; |
+ size_t total_size_; |
+ WebSocketBridge* bridge_; |
+ |
+ // mojo-related |
+ mojo::ScopedDataPipeProducerHandle writer_; |
+ mojo::common::HandleWatcher handle_watcher_; |
+ |
+ //chromium-ipc-related |
+ scoped_ptr<base::SharedMemory> shm_buffer_; |
+ std::deque<std::pair<size_t, size_t>> in_flight_requets_; // <offset, size> |
+ |
+ base::Time start_time_; |
+}; |
+ |
+void WebSocketBridge::loaderTestTransmit(LoaderTestIPC ipc, bool verify_data, size_t bucket_size, size_t buffer_size, size_t total_size, blink::WebCallbacks<int, void>* raw_callbacks) { |
+ scoped_ptr<blink::WebCallbacks<int, void>> callbacks(raw_callbacks); |
+ |
+ if (loader_test_job_) { |
+ fprintf(stderr, "job exists\n"); |
+ callbacks->onError(); |
+ return; |
+ } |
+ |
+ WebSocketDispatcher* dispatcher = |
+ ChildThreadImpl::current()->websocket_dispatcher(); |
+ loader_test_job_.reset(new LoaderTestJob( |
+ ipc, std::move(callbacks), bucket_size, buffer_size, total_size, this)); |
+ loader_test_job_->Start(dispatcher->loader_test_service()); |
+} |
+ |
+ |
+void WebSocketBridge::OnLoaderTransferTest_SetDataBuffer( |
+ base::SharedMemoryHandle shm_handle) { |
+ if (loader_test_job_) |
+ loader_test_job_->OnSetDataBuffer(shm_handle); |
+} |
+ |
+void WebSocketBridge::OnLoaderTransferTest_ReceivedAck() { |
+ if (loader_test_job_) |
+ loader_test_job_->OnReceivedAck(); |
+} |
+ |
+void WebSocketBridge::OnLoaderTransferTest_Done() { |
+ if (loader_test_job_) |
+ loader_test_job_->OnReceivedDone(); |
+} |
+ |
} // namespace content |