Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(55)

Side by Side Diff: content/child/websocket_bridge.cc

Issue 1461283002: [DO NOT COMMIT] mojo datapipe performance measurement Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: rebase Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « content/child/websocket_bridge.h ('k') | content/child/websocket_dispatcher.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/child/websocket_bridge.h" 5 #include "content/child/websocket_bridge.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <string> 8 #include <string>
9 #include <utility> 9 #include <utility>
10 #include <vector> 10 #include <vector>
11 11
12 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "base/macros.h"
13 #include "base/strings/string_util.h" 14 #include "base/strings/string_util.h"
14 #include "content/child/child_thread_impl.h" 15 #include "content/child/child_thread_impl.h"
15 #include "content/child/websocket_dispatcher.h" 16 #include "content/child/websocket_dispatcher.h"
16 #include "content/common/websocket.h" 17 #include "content/common/websocket.h"
17 #include "content/common/websocket_messages.h" 18 #include "content/common/websocket_messages.h"
19 #include "mojo/message_pump/handle_watcher.h"
18 #include "ipc/ipc_message.h" 20 #include "ipc/ipc_message.h"
19 #include "ipc/ipc_message_macros.h" 21 #include "ipc/ipc_message_macros.h"
20 #include "third_party/WebKit/public/platform/WebSecurityOrigin.h" 22 #include "third_party/WebKit/public/platform/WebSecurityOrigin.h"
21 #include "third_party/WebKit/public/platform/WebString.h" 23 #include "third_party/WebKit/public/platform/WebString.h"
22 #include "third_party/WebKit/public/platform/WebURL.h" 24 #include "third_party/WebKit/public/platform/WebURL.h"
23 #include "third_party/WebKit/public/platform/WebVector.h" 25 #include "third_party/WebKit/public/platform/WebVector.h"
24 #include "third_party/WebKit/public/platform/modules/websockets/WebSocketHandle. h" 26 #include "third_party/WebKit/public/platform/modules/websockets/WebSocketHandle. h"
25 #include "third_party/WebKit/public/platform/modules/websockets/WebSocketHandleC lient.h" 27 #include "third_party/WebKit/public/platform/modules/websockets/WebSocketHandleC lient.h"
26 #include "third_party/WebKit/public/platform/modules/websockets/WebSocketHandsha keRequestInfo.h" 28 #include "third_party/WebKit/public/platform/modules/websockets/WebSocketHandsha keRequestInfo.h"
27 #include "third_party/WebKit/public/platform/modules/websockets/WebSocketHandsha keResponseInfo.h" 29 #include "third_party/WebKit/public/platform/modules/websockets/WebSocketHandsha keResponseInfo.h"
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
68 IPC_MESSAGE_HANDLER(WebSocketMsg_NotifyStartOpeningHandshake, 70 IPC_MESSAGE_HANDLER(WebSocketMsg_NotifyStartOpeningHandshake,
69 DidStartOpeningHandshake) 71 DidStartOpeningHandshake)
70 IPC_MESSAGE_HANDLER(WebSocketMsg_NotifyFinishOpeningHandshake, 72 IPC_MESSAGE_HANDLER(WebSocketMsg_NotifyFinishOpeningHandshake,
71 DidFinishOpeningHandshake) 73 DidFinishOpeningHandshake)
72 IPC_MESSAGE_HANDLER(WebSocketMsg_NotifyFailure, DidFail) 74 IPC_MESSAGE_HANDLER(WebSocketMsg_NotifyFailure, DidFail)
73 IPC_MESSAGE_HANDLER(WebSocketMsg_SendFrame, DidReceiveData) 75 IPC_MESSAGE_HANDLER(WebSocketMsg_SendFrame, DidReceiveData)
74 IPC_MESSAGE_HANDLER(WebSocketMsg_FlowControl, DidReceiveFlowControl) 76 IPC_MESSAGE_HANDLER(WebSocketMsg_FlowControl, DidReceiveFlowControl)
75 IPC_MESSAGE_HANDLER(WebSocketMsg_DropChannel, DidClose) 77 IPC_MESSAGE_HANDLER(WebSocketMsg_DropChannel, DidClose)
76 IPC_MESSAGE_HANDLER(WebSocketMsg_NotifyClosing, 78 IPC_MESSAGE_HANDLER(WebSocketMsg_NotifyClosing,
77 DidStartClosingHandshake) 79 DidStartClosingHandshake)
80 IPC_MESSAGE_HANDLER(WebSocketMsg_LoaderTransferTest_SetDataBuffer,
81 OnLoaderTransferTest_SetDataBuffer)
82 IPC_MESSAGE_HANDLER(WebSocketMsg_LoaderTransferTest_Ack,
83 OnLoaderTransferTest_ReceivedAck)
84 IPC_MESSAGE_HANDLER(WebSocketMsg_LoaderTransferTest_Done,
85 OnLoaderTransferTest_Done)
86
78 IPC_MESSAGE_UNHANDLED(handled = false) 87 IPC_MESSAGE_UNHANDLED(handled = false)
79 IPC_END_MESSAGE_MAP() 88 IPC_END_MESSAGE_MAP()
80 return handled; 89 return handled;
81 } 90 }
82 91
83 void WebSocketBridge::DidConnect(const std::string& selected_protocol, 92 void WebSocketBridge::DidConnect(const std::string& selected_protocol,
84 const std::string& extensions) { 93 const std::string& extensions) {
85 WebSocketHandleClient* client = client_; 94 WebSocketHandleClient* client = client_;
86 DVLOG(1) << "WebSocketBridge::DidConnect(" 95 DVLOG(1) << "WebSocketBridge::DidConnect("
87 << selected_protocol << ", " 96 << selected_protocol << ", "
(...skipping 195 matching lines...) Expand 10 before | Expand all | Expand 10 after
283 if (channel_id_ == kInvalidChannelId) 292 if (channel_id_ == kInvalidChannelId)
284 return; 293 return;
285 WebSocketDispatcher* dispatcher = 294 WebSocketDispatcher* dispatcher =
286 ChildThreadImpl::current()->websocket_dispatcher(); 295 ChildThreadImpl::current()->websocket_dispatcher();
287 dispatcher->RemoveBridge(channel_id_); 296 dispatcher->RemoveBridge(channel_id_);
288 297
289 channel_id_ = kInvalidChannelId; 298 channel_id_ = kInvalidChannelId;
290 client_ = NULL; 299 client_ = NULL;
291 } 300 }
292 301
302 class WebSocketBridge::LoaderTestJob {
303 public:
304 LoaderTestJob(WebSocketBridge::LoaderTestIPC ipc,
305 scoped_ptr<blink::WebCallbacks<int, void>> callbacks,
306 size_t bucket_size,
307 size_t buffer_size,
308 size_t total_size,
309 WebSocketBridge* bridge)
310 : switch_(ipc),
311 callbacks_(std::move(callbacks)),
312 bucket_data_(std::vector<char>(bucket_size, 'a')),
313 written_size_(0),
314 buffer_size_(buffer_size),
315 total_size_(total_size),
316 bridge_(bridge) {}
317
318 void StartWatching() {
319 handle_watcher_.Start(
320 writer_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE,
321 base::Bind(&LoaderTestJob::OnWritable, base::Unretained(this)));
322 }
323
324 void Start(LoaderTestService* service) {
325 start_time_ = base::Time::Now();
326 if (switch_ == WebSocketBridge::ViaChromiumIPC) {
327 StartChromiumIPCLoading();
328 } else {
329 StartMojoLoading(service);
330 }
331 }
332
333 void StartChromiumIPCLoading() {
334 ChildThreadImpl::current()->Send(
335 new WebSocketHostMsg_LoaderTransferTest_Setup(bridge_->channel_id_,
336 buffer_size_));
337 }
338
339 void OnSetDataBuffer(base::SharedMemoryHandle shm_handle) {
340 CHECK(base::SharedMemory::IsHandleValid(shm_handle));
341
342 shm_buffer_.reset(new base::SharedMemory(shm_handle, false));
343
344 bool ok = shm_buffer_->Map(buffer_size_);
345 CHECK(ok);
346
347 while (Send()) {
348 }
349 }
350
351 bool Send() {
352 size_t offset = 0;
353 size_t to_be_written = buffer_size_;
354 if (!in_flight_requets_.empty()) {
355 const auto& front = in_flight_requets_.front();
356 const auto& back = in_flight_requets_.back();
357 if (back.first + back.second != buffer_size_) {
358 offset = back.first + back.second;
359 to_be_written = std::min(to_be_written, buffer_size_ - offset);
360 }
361 if (offset <= front.first) {
362 to_be_written = std::min(to_be_written, front.first - offset);
363 }
364 }
365 size_t head = written_size_ % bucket_data_.size();
366 to_be_written =
367 std::min(to_be_written,
368 std::min(bucket_data_.size() - head,
369 total_size_ - written_size_));
370 if (to_be_written == 0)
371 return false;
372
373 std::copy(bucket_data_.begin() + head,
374 bucket_data_.begin() + head + to_be_written,
375 static_cast<char*>(shm_buffer_->memory()) + offset);
376 written_size_ += to_be_written;
377 ChildThreadImpl::current()->Send(
378 new WebSocketHostMsg_LoaderTransferTest_Send(
379 bridge_->channel_id_, offset, to_be_written));
380
381 in_flight_requets_.push_back(std::make_pair(offset, to_be_written));
382 if (written_size_ == total_size_) {
383 ChildThreadImpl::current()->Send(
384 new WebSocketHostMsg_LoaderTransferTest_Close(
385 bridge_->channel_id_));
386 }
387 return true;
388 }
389
390 void OnReceivedAck() {
391 // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__);
392 in_flight_requets_.pop_front();
393 while (Send()) {
394 }
395 }
396
397 void OnReceivedDone() {
398 CHECK(in_flight_requets_.empty());
399 callbacks_->onSuccess(
400 static_cast<int>((base::Time::Now() - start_time_).InMicroseconds()));
401 bridge_->loader_test_job_ = nullptr;
402 // |this| is deleted here.
403 }
404
405 void StartMojoLoading(LoaderTestService* service) {
406 // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__);
407 MojoCreateDataPipeOptions options;
408 options.struct_size = sizeof(MojoCreateDataPipeOptions);
409 options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE;
410 options.element_num_bytes = 1;
411 options.capacity_num_bytes = buffer_size_;
412 mojo::DataPipe data_pipe(options);
413 mojo::ScopedDataPipeConsumerHandle handle;
414
415 writer_ = std::move(data_pipe.producer_handle);
416 service->Transmit(
417 std::move(data_pipe.consumer_handle),
418 base::Bind(&LoaderTestJob::OnSuccess, base::Unretained(this)));
419 StartWatching();
420 }
421
422 void OnWritable(MojoResult result) {
423 // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__);
424 while (true) {
425 void* buffer = nullptr;
426 size_t head = written_size_ % bucket_data_.size();
427 uint32_t available = 0;
428 MojoResult result = mojo::BeginWriteDataRaw(
429 writer_.get(), &buffer, &available, MOJO_WRITE_DATA_FLAG_NONE);
430 size_t to_be_written = std::min(
431 static_cast<size_t>(available),
432 std::min(bucket_data_.size() - head, total_size_ - written_size_));
433
434 if (result == MOJO_RESULT_OK) {
435 std::copy(bucket_data_.begin() + head,
436 bucket_data_.begin() + head + to_be_written,
437 static_cast<char*>(buffer));
438 // fprintf(stderr, "Wrote %zu bytes\n",
439 // static_cast<size_t>(to_be_written));
440 mojo::EndWriteDataRaw(writer_.get(), to_be_written);
441 written_size_ += to_be_written;
442
443 if (written_size_ == total_size_) {
444 // fprintf(stderr, "wrote %zu bytes in total: close\n",
445 // written_size_);
446 writer_.reset();
447 return;
448 }
449 } else if (result == MOJO_RESULT_SHOULD_WAIT) {
450 StartWatching();
451 break;
452 } else {
453 callbacks_->onError();
454 bridge_->loader_test_job_ = nullptr;
455 // |this| is deleted here.
456 return;
457 }
458 }
459 }
460
461 void OnSuccess() {
462 callbacks_->onSuccess(
463 static_cast<int>((base::Time::Now() - start_time_).InMicroseconds()));
464 bridge_->loader_test_job_ = nullptr;
465 // |this| is deleted here.
466 }
467
468 private:
469 WebSocketBridge::LoaderTestIPC switch_;
470 scoped_ptr<blink::WebCallbacks<int, void>> callbacks_;
471 std::vector<char> bucket_data_;
472 size_t written_size_;
473 size_t buffer_size_;
474 size_t total_size_;
475 WebSocketBridge* bridge_;
476
477 // mojo-related
478 mojo::ScopedDataPipeProducerHandle writer_;
479 mojo::common::HandleWatcher handle_watcher_;
480
481 //chromium-ipc-related
482 scoped_ptr<base::SharedMemory> shm_buffer_;
483 std::deque<std::pair<size_t, size_t>> in_flight_requets_; // <offset, size>
484
485 base::Time start_time_;
486 };
487
488 void WebSocketBridge::loaderTestTransmit(LoaderTestIPC ipc, bool verify_data, si ze_t bucket_size, size_t buffer_size, size_t total_size, blink::WebCallbacks<int , void>* raw_callbacks) {
489 scoped_ptr<blink::WebCallbacks<int, void>> callbacks(raw_callbacks);
490
491 if (loader_test_job_) {
492 fprintf(stderr, "job exists\n");
493 callbacks->onError();
494 return;
495 }
496
497 WebSocketDispatcher* dispatcher =
498 ChildThreadImpl::current()->websocket_dispatcher();
499 loader_test_job_.reset(new LoaderTestJob(
500 ipc, std::move(callbacks), bucket_size, buffer_size, total_size, this));
501 loader_test_job_->Start(dispatcher->loader_test_service());
502 }
503
504
505 void WebSocketBridge::OnLoaderTransferTest_SetDataBuffer(
506 base::SharedMemoryHandle shm_handle) {
507 if (loader_test_job_)
508 loader_test_job_->OnSetDataBuffer(shm_handle);
509 }
510
511 void WebSocketBridge::OnLoaderTransferTest_ReceivedAck() {
512 if (loader_test_job_)
513 loader_test_job_->OnReceivedAck();
514 }
515
516 void WebSocketBridge::OnLoaderTransferTest_Done() {
517 if (loader_test_job_)
518 loader_test_job_->OnReceivedDone();
519 }
520
293 } // namespace content 521 } // namespace content
OLDNEW
« no previous file with comments | « content/child/websocket_bridge.h ('k') | content/child/websocket_dispatcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698