Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(824)

Side by Side Diff: mojo/services/network/public/cpp/web_socket_read_queue.cc

Issue 1281073003: Avoid using two-phase read with MOJO_READ_DATA_FLAG_ALL_OR_NONE. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/services/network/public/cpp/web_socket_read_queue.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "network/public/cpp/web_socket_read_queue.h" 5 #include "network/public/cpp/web_socket_read_queue.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/logging.h" 8 #include "base/logging.h"
9 #include "base/memory/scoped_ptr.h"
9 10
10 namespace mojo { 11 namespace mojo {
11 12
12 struct WebSocketReadQueue::Operation { 13 struct WebSocketReadQueue::Operation {
14 Operation(uint32_t num_bytes,
15 const base::Callback<void(const char*)>& callback)
16 : num_bytes_(num_bytes), callback_(callback), current_num_bytes_(0) {}
17
13 uint32_t num_bytes_; 18 uint32_t num_bytes_;
14 base::Callback<void(const char*)> callback_; 19 base::Callback<void(const char*)> callback_;
20
21 // If the initial read doesn't return enough data, this array is used to
22 // accumulate data from multiple reads.
23 scoped_ptr<char[]> data_buffer_;
24 // The number of bytes accumulated so far.
25 uint32_t current_num_bytes_;
15 }; 26 };
16 27
17 WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle) 28 WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle)
18 : handle_(handle), is_busy_(false), weak_factory_(this) { 29 : handle_(handle), is_busy_(false), weak_factory_(this) {
19 } 30 }
20 31
21 WebSocketReadQueue::~WebSocketReadQueue() { 32 WebSocketReadQueue::~WebSocketReadQueue() {
22 } 33 }
23 34
24 void WebSocketReadQueue::Read(uint32_t num_bytes, 35 void WebSocketReadQueue::Read(
25 base::Callback<void(const char*)> callback) { 36 uint32_t num_bytes,
26 Operation* op = new Operation; 37 const base::Callback<void(const char*)>& callback) {
27 op->num_bytes_ = num_bytes; 38 Operation* op = new Operation(num_bytes, callback);
28 op->callback_ = callback;
29 queue_.push_back(op); 39 queue_.push_back(op);
30 40
31 if (is_busy_) 41 if (is_busy_)
32 return; 42 return;
33 43
34 is_busy_ = true; 44 is_busy_ = true;
35 TryToRead(); 45 TryToRead();
36 } 46 }
37 47
38 void WebSocketReadQueue::TryToRead() { 48 void WebSocketReadQueue::TryToRead() {
39 DCHECK(is_busy_); 49 DCHECK(is_busy_);
40 DCHECK(!queue_.empty()); 50 DCHECK(!queue_.empty());
41 do { 51 do {
42 Operation* op = queue_[0]; 52 Operation* op = queue_[0];
43 const void* buffer = NULL; 53 const void* buffer = nullptr;
44 uint32_t bytes_read = op->num_bytes_; 54 uint32_t buffer_size = 0;
45 MojoResult result = BeginReadDataRaw( 55 MojoResult result = BeginReadDataRaw(handle_, &buffer, &buffer_size,
46 handle_, &buffer, &bytes_read, MOJO_READ_DATA_FLAG_ALL_OR_NONE); 56 MOJO_READ_DATA_FLAG_NONE);
47 if (result == MOJO_RESULT_SHOULD_WAIT) { 57 if (result == MOJO_RESULT_SHOULD_WAIT) {
48 Wait(); 58 Wait();
49 return; 59 return;
50 } 60 }
51 61
52 // Ensure |op| is deleted, whether or not |this| goes away.
53 scoped_ptr<Operation> op_deleter(op);
54 queue_.weak_erase(queue_.begin());
55
56 // http://crbug.com/490193 This should run callback as well. May need to 62 // http://crbug.com/490193 This should run callback as well. May need to
57 // change the callback signature. 63 // change the callback signature.
58 if (result != MOJO_RESULT_OK) 64 if (result != MOJO_RESULT_OK)
59 return; 65 return;
60 66
61 uint32_t num_bytes = op_deleter->num_bytes_; 67 uint32_t bytes_read = buffer_size < op->num_bytes_ - op->current_num_bytes_
62 DCHECK_LE(num_bytes, bytes_read); 68 ? buffer_size
69 : op->num_bytes_ - op->current_num_bytes_;
70
71 // If this is not the initial read, or this is the initial read but doesn't
72 // return enough data, copy the data into |op->data_buffer_|.
73 if (op->data_buffer_ ||
74 bytes_read < op->num_bytes_ - op->current_num_bytes_) {
75 if (!op->data_buffer_) {
76 DCHECK_EQ(0u, op->current_num_bytes_);
77 op->data_buffer_.reset(new char[op->num_bytes_]);
78 }
79
80 memcpy(op->data_buffer_.get() + op->current_num_bytes_, buffer,
81 bytes_read);
82 }
83 op->current_num_bytes_ += bytes_read;
63 DataPipeConsumerHandle handle = handle_; 84 DataPipeConsumerHandle handle = handle_;
64
65 base::WeakPtr<WebSocketReadQueue> self(weak_factory_.GetWeakPtr()); 85 base::WeakPtr<WebSocketReadQueue> self(weak_factory_.GetWeakPtr());
66 86
67 // This call may delete |this|. In that case, |self| will be invalidated. 87 if (op->current_num_bytes_ >= op->num_bytes_) {
68 // It may re-enter Read() too. Because |is_busy_| is true during the whole 88 DCHECK_EQ(op->current_num_bytes_, op->num_bytes_);
69 // process, TryToRead() won't be re-entered. 89 const char* returned_buffer = op->data_buffer_
70 op->callback_.Run(static_cast<const char*>(buffer)); 90 ? op->data_buffer_.get()
91 : static_cast<const char*>(buffer);
71 92
72 EndReadDataRaw(handle, num_bytes); 93 // Ensure |op| is deleted, whether or not |this| goes away.
94 scoped_ptr<Operation> op_deleter(op);
95 queue_.weak_erase(queue_.begin());
96
97 // This call may delete |this|. In that case, |self| will be invalidated.
98 // It may re-enter Read() too. Because |is_busy_| is true during the whole
99 // process, TryToRead() won't be re-entered.
100 op->callback_.Run(returned_buffer);
101 }
102
103 EndReadDataRaw(handle, bytes_read);
73 104
74 if (!self) 105 if (!self)
75 return; 106 return;
76 } while (!queue_.empty()); 107 } while (!queue_.empty());
77 is_busy_ = false; 108 is_busy_ = false;
78 } 109 }
79 110
80 void WebSocketReadQueue::Wait() { 111 void WebSocketReadQueue::Wait() {
81 DCHECK(is_busy_); 112 DCHECK(is_busy_);
82 handle_watcher_.Start( 113 handle_watcher_.Start(
83 handle_, 114 handle_,
84 MOJO_HANDLE_SIGNAL_READABLE, 115 MOJO_HANDLE_SIGNAL_READABLE,
85 MOJO_DEADLINE_INDEFINITE, 116 MOJO_DEADLINE_INDEFINITE,
86 base::Bind(&WebSocketReadQueue::OnHandleReady, base::Unretained(this))); 117 base::Bind(&WebSocketReadQueue::OnHandleReady, base::Unretained(this)));
87 } 118 }
88 119
89 void WebSocketReadQueue::OnHandleReady(MojoResult result) { 120 void WebSocketReadQueue::OnHandleReady(MojoResult result) {
90 DCHECK(is_busy_); 121 DCHECK(is_busy_);
91 TryToRead(); 122 TryToRead();
92 } 123 }
93 124
94 } // namespace mojo 125 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/services/network/public/cpp/web_socket_read_queue.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698