Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1859)

Unified Diff: content/child/websocket_bridge.cc

Issue 1461283002: [DO NOT COMMIT] mojo datapipe performance measurement Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: rebase Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « content/child/websocket_bridge.h ('k') | content/child/websocket_dispatcher.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: content/child/websocket_bridge.cc
diff --git a/content/child/websocket_bridge.cc b/content/child/websocket_bridge.cc
index 7cfff1ad50a942960024e79ac8d98377e363642d..6a37e05702743e57455bda21c71a5479fa67f2eb 100644
--- a/content/child/websocket_bridge.cc
+++ b/content/child/websocket_bridge.cc
@@ -10,11 +10,13 @@
#include <vector>
#include "base/logging.h"
+#include "base/macros.h"
#include "base/strings/string_util.h"
#include "content/child/child_thread_impl.h"
#include "content/child/websocket_dispatcher.h"
#include "content/common/websocket.h"
#include "content/common/websocket_messages.h"
+#include "mojo/message_pump/handle_watcher.h"
#include "ipc/ipc_message.h"
#include "ipc/ipc_message_macros.h"
#include "third_party/WebKit/public/platform/WebSecurityOrigin.h"
@@ -75,6 +77,13 @@ bool WebSocketBridge::OnMessageReceived(const IPC::Message& msg) {
IPC_MESSAGE_HANDLER(WebSocketMsg_DropChannel, DidClose)
IPC_MESSAGE_HANDLER(WebSocketMsg_NotifyClosing,
DidStartClosingHandshake)
+ IPC_MESSAGE_HANDLER(WebSocketMsg_LoaderTransferTest_SetDataBuffer,
+ OnLoaderTransferTest_SetDataBuffer)
+ IPC_MESSAGE_HANDLER(WebSocketMsg_LoaderTransferTest_Ack,
+ OnLoaderTransferTest_ReceivedAck)
+ IPC_MESSAGE_HANDLER(WebSocketMsg_LoaderTransferTest_Done,
+ OnLoaderTransferTest_Done)
+
IPC_MESSAGE_UNHANDLED(handled = false)
IPC_END_MESSAGE_MAP()
return handled;
@@ -290,4 +299,223 @@ void WebSocketBridge::Disconnect() {
client_ = NULL;
}
+class WebSocketBridge::LoaderTestJob {
+ public:
+ LoaderTestJob(WebSocketBridge::LoaderTestIPC ipc,
+ scoped_ptr<blink::WebCallbacks<int, void>> callbacks,
+ size_t bucket_size,
+ size_t buffer_size,
+ size_t total_size,
+ WebSocketBridge* bridge)
+ : switch_(ipc),
+ callbacks_(std::move(callbacks)),
+ bucket_data_(std::vector<char>(bucket_size, 'a')),
+ written_size_(0),
+ buffer_size_(buffer_size),
+ total_size_(total_size),
+ bridge_(bridge) {}
+
+ void StartWatching() {
+ handle_watcher_.Start(
+ writer_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE,
+ base::Bind(&LoaderTestJob::OnWritable, base::Unretained(this)));
+ }
+
+ void Start(LoaderTestService* service) {
+ start_time_ = base::Time::Now();
+ if (switch_ == WebSocketBridge::ViaChromiumIPC) {
+ StartChromiumIPCLoading();
+ } else {
+ StartMojoLoading(service);
+ }
+ }
+
+ void StartChromiumIPCLoading() {
+ ChildThreadImpl::current()->Send(
+ new WebSocketHostMsg_LoaderTransferTest_Setup(bridge_->channel_id_,
+ buffer_size_));
+ }
+
+ void OnSetDataBuffer(base::SharedMemoryHandle shm_handle) {
+ CHECK(base::SharedMemory::IsHandleValid(shm_handle));
+
+ shm_buffer_.reset(new base::SharedMemory(shm_handle, false));
+
+ bool ok = shm_buffer_->Map(buffer_size_);
+ CHECK(ok);
+
+ while (Send()) {
+ }
+ }
+
+ bool Send() {
+ size_t offset = 0;
+ size_t to_be_written = buffer_size_;
+ if (!in_flight_requets_.empty()) {
+ const auto& front = in_flight_requets_.front();
+ const auto& back = in_flight_requets_.back();
+ if (back.first + back.second != buffer_size_) {
+ offset = back.first + back.second;
+ to_be_written = std::min(to_be_written, buffer_size_ - offset);
+ }
+ if (offset <= front.first) {
+ to_be_written = std::min(to_be_written, front.first - offset);
+ }
+ }
+ size_t head = written_size_ % bucket_data_.size();
+ to_be_written =
+ std::min(to_be_written,
+ std::min(bucket_data_.size() - head,
+ total_size_ - written_size_));
+ if (to_be_written == 0)
+ return false;
+
+ std::copy(bucket_data_.begin() + head,
+ bucket_data_.begin() + head + to_be_written,
+ static_cast<char*>(shm_buffer_->memory()) + offset);
+ written_size_ += to_be_written;
+ ChildThreadImpl::current()->Send(
+ new WebSocketHostMsg_LoaderTransferTest_Send(
+ bridge_->channel_id_, offset, to_be_written));
+
+ in_flight_requets_.push_back(std::make_pair(offset, to_be_written));
+ if (written_size_ == total_size_) {
+ ChildThreadImpl::current()->Send(
+ new WebSocketHostMsg_LoaderTransferTest_Close(
+ bridge_->channel_id_));
+ }
+ return true;
+ }
+
+ void OnReceivedAck() {
+ // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__);
+ in_flight_requets_.pop_front();
+ while (Send()) {
+ }
+ }
+
+ void OnReceivedDone() {
+ CHECK(in_flight_requets_.empty());
+ callbacks_->onSuccess(
+ static_cast<int>((base::Time::Now() - start_time_).InMicroseconds()));
+ bridge_->loader_test_job_ = nullptr;
+ // |this| is deleted here.
+ }
+
+ void StartMojoLoading(LoaderTestService* service) {
+ // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__);
+ MojoCreateDataPipeOptions options;
+ options.struct_size = sizeof(MojoCreateDataPipeOptions);
+ options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE;
+ options.element_num_bytes = 1;
+ options.capacity_num_bytes = buffer_size_;
+ mojo::DataPipe data_pipe(options);
+ mojo::ScopedDataPipeConsumerHandle handle;
+
+ writer_ = std::move(data_pipe.producer_handle);
+ service->Transmit(
+ std::move(data_pipe.consumer_handle),
+ base::Bind(&LoaderTestJob::OnSuccess, base::Unretained(this)));
+ StartWatching();
+ }
+
+ void OnWritable(MojoResult result) {
+ // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__);
+ while (true) {
+ void* buffer = nullptr;
+ size_t head = written_size_ % bucket_data_.size();
+ uint32_t available = 0;
+ MojoResult result = mojo::BeginWriteDataRaw(
+ writer_.get(), &buffer, &available, MOJO_WRITE_DATA_FLAG_NONE);
+ size_t to_be_written = std::min(
+ static_cast<size_t>(available),
+ std::min(bucket_data_.size() - head, total_size_ - written_size_));
+
+ if (result == MOJO_RESULT_OK) {
+ std::copy(bucket_data_.begin() + head,
+ bucket_data_.begin() + head + to_be_written,
+ static_cast<char*>(buffer));
+ // fprintf(stderr, "Wrote %zu bytes\n",
+ // static_cast<size_t>(to_be_written));
+ mojo::EndWriteDataRaw(writer_.get(), to_be_written);
+ written_size_ += to_be_written;
+
+ if (written_size_ == total_size_) {
+ // fprintf(stderr, "wrote %zu bytes in total: close\n",
+ // written_size_);
+ writer_.reset();
+ return;
+ }
+ } else if (result == MOJO_RESULT_SHOULD_WAIT) {
+ StartWatching();
+ break;
+ } else {
+ callbacks_->onError();
+ bridge_->loader_test_job_ = nullptr;
+ // |this| is deleted here.
+ return;
+ }
+ }
+ }
+
+ void OnSuccess() {
+ callbacks_->onSuccess(
+ static_cast<int>((base::Time::Now() - start_time_).InMicroseconds()));
+ bridge_->loader_test_job_ = nullptr;
+ // |this| is deleted here.
+ }
+
+ private:
+ WebSocketBridge::LoaderTestIPC switch_;
+ scoped_ptr<blink::WebCallbacks<int, void>> callbacks_;
+ std::vector<char> bucket_data_;
+ size_t written_size_;
+ size_t buffer_size_;
+ size_t total_size_;
+ WebSocketBridge* bridge_;
+
+ // mojo-related
+ mojo::ScopedDataPipeProducerHandle writer_;
+ mojo::common::HandleWatcher handle_watcher_;
+
+ //chromium-ipc-related
+ scoped_ptr<base::SharedMemory> shm_buffer_;
+ std::deque<std::pair<size_t, size_t>> in_flight_requets_; // <offset, size>
+
+ base::Time start_time_;
+};
+
+void WebSocketBridge::loaderTestTransmit(LoaderTestIPC ipc, bool verify_data, size_t bucket_size, size_t buffer_size, size_t total_size, blink::WebCallbacks<int, void>* raw_callbacks) {
+ scoped_ptr<blink::WebCallbacks<int, void>> callbacks(raw_callbacks);
+
+ if (loader_test_job_) {
+ fprintf(stderr, "job exists\n");
+ callbacks->onError();
+ return;
+ }
+
+ WebSocketDispatcher* dispatcher =
+ ChildThreadImpl::current()->websocket_dispatcher();
+ loader_test_job_.reset(new LoaderTestJob(
+ ipc, std::move(callbacks), bucket_size, buffer_size, total_size, this));
+ loader_test_job_->Start(dispatcher->loader_test_service());
+}
+
+
+void WebSocketBridge::OnLoaderTransferTest_SetDataBuffer(
+ base::SharedMemoryHandle shm_handle) {
+ if (loader_test_job_)
+ loader_test_job_->OnSetDataBuffer(shm_handle);
+}
+
+void WebSocketBridge::OnLoaderTransferTest_ReceivedAck() {
+ if (loader_test_job_)
+ loader_test_job_->OnReceivedAck();
+}
+
+void WebSocketBridge::OnLoaderTransferTest_Done() {
+ if (loader_test_job_)
+ loader_test_job_->OnReceivedDone();
+}
+
} // namespace content
« no previous file with comments | « content/child/websocket_bridge.h ('k') | content/child/websocket_dispatcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698