Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(493)

Side by Side Diff: content/child/shared_memory_data_consumer_handle.cc

Issue 1144033002: Make SharedMemoryDataConsumerHandle thread-safe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@ipc-data-consumer
Patch Set: rebase Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/child/shared_memory_data_consumer_handle.h" 5 #include "content/child/shared_memory_data_consumer_handle.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <deque> 8 #include <deque>
9 #include <vector> 9 #include <vector>
10 10
11 #include "base/bind.h"
12 #include "base/message_loop/message_loop.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/synchronization/lock.h"
11 #include "content/public/child/fixed_received_data.h" 15 #include "content/public/child/fixed_received_data.h"
12 16
13 namespace content { 17 namespace content {
14 18
19 namespace {
20
21 class DelegateThreadSafeReceivedData final
22 : public RequestPeer::ThreadSafeReceivedData {
23 public:
24 explicit DelegateThreadSafeReceivedData(
25 scoped_ptr<RequestPeer::ReceivedData> data)
26 : data_(data.Pass()),
27 task_runner_(base::MessageLoop::current()->task_runner()) {}
28 ~DelegateThreadSafeReceivedData() override {
29 if (!task_runner_->BelongsToCurrentThread()) {
30 // Delete the data on the original thread.
31 task_runner_->DeleteSoon(FROM_HERE, data_.release());
32 }
33 }
34
35 const char* payload() const override { return data_->payload(); }
36 int length() const override { return data_->length(); }
37 int encoded_length() const override { return data_->encoded_length(); }
38
39 private:
40 scoped_ptr<RequestPeer::ReceivedData> data_;
41 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
42
43 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData);
44 };
45
46 } // namespace
47
15 using Result = blink::WebDataConsumerHandle::Result; 48 using Result = blink::WebDataConsumerHandle::Result;
16 49
17 class SharedMemoryDataConsumerHandle::Context final 50 class SharedMemoryDataConsumerHandle::Context final
18 : public base::RefCountedThreadSafe<Context> { 51 : public base::RefCountedThreadSafe<Context> {
19 public: 52 public:
20 Context() 53 Context()
21 : result_(Ok), 54 : result_(Ok),
22 first_offset_(0), 55 first_offset_(0),
23 client_(nullptr), 56 client_(nullptr),
24 is_reader_active_(true) {} 57 is_reader_active_(true) {}
25 58
26 bool IsEmpty() const { return queue_.empty(); } 59 bool IsEmpty() const { return queue_.empty(); }
27 void Clear() { 60 void Clear() {
28 for (RequestPeer::ReceivedData* data : queue_) { 61 for (auto& data : queue_) {
29 delete data; 62 delete data;
30 } 63 }
31 queue_.clear(); 64 queue_.clear();
32 first_offset_ = 0; 65 first_offset_ = 0;
33 client_ = nullptr; 66 client_ = nullptr;
34 } 67 }
35 RequestPeer::ReceivedData* Top() { return queue_.front(); } 68 void Notify() {
36 void Push(scoped_ptr<RequestPeer::ReceivedData> data) { 69 // Note that this function is not protected by |lock_| (actually it
70 // shouldn't be) but |notification_task_runner_| is thread-safe.
71
72 if (notification_task_runner_->BelongsToCurrentThread()) {
73 NotifyImmediately();
74 } else {
75 notification_task_runner_->PostTask(
76 FROM_HERE, base::Bind(&Context::NotifyImmediately, this));
77 }
78 }
79
80 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); }
81 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) {
37 queue_.push_back(data.release()); 82 queue_.push_back(data.release());
38 } 83 }
39 size_t first_offset() const { return first_offset_; } 84 size_t first_offset() const { return first_offset_; }
40 Result result() const { return result_; } 85 Result result() const { return result_; }
41 void set_result(Result r) { result_ = r; } 86 void set_result(Result r) { result_ = r; }
42 Client* client() { return client_; } 87 Client* client() { return client_; }
43 void set_client(Client* client) { client_ = client; } 88 void SetClient(Client* client) {
89 if (client) {
90 notification_task_runner_ = base::MessageLoop::current()->task_runner();
91 client_ = client;
92 } else {
93 notification_task_runner_ = nullptr;
94 client_ = nullptr;
95 }
96 }
44 bool is_reader_active() const { return is_reader_active_; } 97 bool is_reader_active() const { return is_reader_active_; }
45 void set_is_reader_active(bool b) { is_reader_active_ = b; } 98 void set_is_reader_active(bool b) { is_reader_active_ = b; }
46 void Consume(size_t s) { 99 void Consume(size_t s) {
47 first_offset_ += s; 100 first_offset_ += s;
48 RequestPeer::ReceivedData* top = Top(); 101 auto top = Top();
49 if (static_cast<size_t>(top->length()) <= first_offset_) { 102 if (static_cast<size_t>(top->length()) <= first_offset_) {
50 delete top; 103 delete top;
51 queue_.pop_front(); 104 queue_.pop_front();
52 first_offset_ = 0; 105 first_offset_ = 0;
53 } 106 }
54 } 107 }
108 base::Lock& lock() { return lock_; }
55 109
56 private: 110 private:
57 friend class base::RefCountedThreadSafe<Context>; 111 friend class base::RefCountedThreadSafe<Context>;
58 ~Context() { 112 ~Context() {
59 // This is necessary because the queue stores raw pointers. 113 // This is necessary because the queue stores raw pointers.
60 Clear(); 114 Clear();
61 } 115 }
62 116
117 void NotifyImmediately() {
118 // As we can assume that all reader-side methods are called on this
119 // thread (see WebDataConsumerHandle comments), we don't need to lock.
120 if (client_)
121 client_->didGetReadable();
122 }
123
124 base::Lock lock_;
63 // |result_| stores the ultimate state of this handle if it has. Otherwise, 125 // |result_| stores the ultimate state of this handle if it has. Otherwise,
64 // |Ok| is set. 126 // |Ok| is set.
65 Result result_; 127 Result result_;
66 // TODO(yhirano): Use std::deque<scoped_ptr<ReceivedData>> once it is allowed. 128 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>>
67 std::deque<RequestPeer::ReceivedData*> queue_; 129 // once it is allowed.
130 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_;
68 size_t first_offset_; 131 size_t first_offset_;
69 Client* client_; 132 Client* client_;
133 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_;
70 bool is_reader_active_; 134 bool is_reader_active_;
71 135
72 DISALLOW_COPY_AND_ASSIGN(Context); 136 DISALLOW_COPY_AND_ASSIGN(Context);
73 }; 137 };
74 138
75 SharedMemoryDataConsumerHandle::Writer::Writer( 139 SharedMemoryDataConsumerHandle::Writer::Writer(
76 const scoped_refptr<Context>& context, 140 const scoped_refptr<Context>& context,
77 BackpressureMode mode) 141 BackpressureMode mode)
78 : context_(context), mode_(mode) { 142 : context_(context), mode_(mode) {
79 } 143 }
80 144
81 SharedMemoryDataConsumerHandle::Writer::~Writer() { 145 SharedMemoryDataConsumerHandle::Writer::~Writer() {
82 Close(); 146 Close();
83 } 147 }
84 148
85 void SharedMemoryDataConsumerHandle::Writer::AddData( 149 void SharedMemoryDataConsumerHandle::Writer::AddData(
86 scoped_ptr<RequestPeer::ReceivedData> data) { 150 scoped_ptr<RequestPeer::ReceivedData> data) {
87 if (!data->length()) { 151 if (!data->length()) {
88 // We omit empty data. 152 // We omit empty data.
89 return; 153 return;
90 } 154 }
91 155
92 if (!context_->is_reader_active()) { 156 bool needs_notification = false;
93 // No one is interested in the data. 157 {
94 return; 158 base::AutoLock lock(context_->lock());
159 if (!context_->is_reader_active()) {
160 // No one is interested in the data.
161 return;
162 }
163
164 needs_notification = context_->client() && context_->IsEmpty();
165 scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass;
166 if (mode_ == kApplyBackpressure) {
167 data_to_pass =
168 make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass()));
169 } else {
170 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
171 }
172 context_->Push(data_to_pass.Pass());
95 } 173 }
96 174
97 bool needs_notification = context_->client() && context_->IsEmpty();
98 scoped_ptr<RequestPeer::ReceivedData> data_to_pass;
99 if (mode_ == kApplyBackpressure) {
100 data_to_pass = data.Pass();
101 } else {
102 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
103 }
104 context_->Push(data_to_pass.Pass());
105
106 if (needs_notification) 175 if (needs_notification)
107 context_->client()->didGetReadable(); 176 context_->Notify();
108 } 177 }
109 178
110 void SharedMemoryDataConsumerHandle::Writer::Close() { 179 void SharedMemoryDataConsumerHandle::Writer::Close() {
111 if (context_->result() == Ok) { 180 bool needs_notification = false;
112 context_->set_result(Done); 181
113 if (context_->client() && context_->IsEmpty()) 182 {
114 context_->client()->didGetReadable(); 183 base::AutoLock lock(context_->lock());
184 if (context_->result() == Ok) {
185 context_->set_result(Done);
186 needs_notification = context_->client() && context_->IsEmpty();
187 }
115 } 188 }
189 if (needs_notification)
190 context_->Notify();
116 } 191 }
117 192
118 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( 193 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
119 BackpressureMode mode, 194 BackpressureMode mode,
120 scoped_ptr<Writer>* writer) 195 scoped_ptr<Writer>* writer)
121 : context_(new Context) { 196 : context_(new Context) {
122 writer->reset(new Writer(context_, mode)); 197 writer->reset(new Writer(context_, mode));
123 } 198 }
124 199
125 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { 200 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
201 base::AutoLock lock(context_->lock());
126 context_->set_is_reader_active(false); 202 context_->set_is_reader_active(false);
127 context_->Clear(); 203 context_->Clear();
128 } 204 }
129 205
130 Result SharedMemoryDataConsumerHandle::read(void* data, 206 Result SharedMemoryDataConsumerHandle::read(void* data,
131 size_t size, 207 size_t size,
132 Flags flags, 208 Flags flags,
133 size_t* read_size_to_return) { 209 size_t* read_size_to_return) {
210 base::AutoLock lock(context_->lock());
211
134 size_t total_read_size = 0; 212 size_t total_read_size = 0;
135 *read_size_to_return = 0; 213 *read_size_to_return = 0;
136 if (context_->result() != Ok && context_->result() != Done) 214 if (context_->result() != Ok && context_->result() != Done)
137 return context_->result(); 215 return context_->result();
138 216
139 while (!context_->IsEmpty() && total_read_size < size) { 217 while (!context_->IsEmpty() && total_read_size < size) {
140 const auto& top = context_->Top(); 218 const auto& top = context_->Top();
141 size_t readable = top->length() - context_->first_offset(); 219 size_t readable = top->length() - context_->first_offset();
142 size_t writable = size - total_read_size; 220 size_t writable = size - total_read_size;
143 size_t read_size = std::min(readable, writable); 221 size_t read_size = std::min(readable, writable);
144 const char* begin = top->payload() + context_->first_offset(); 222 const char* begin = top->payload() + context_->first_offset();
145 std::copy(begin, begin + read_size, 223 std::copy(begin, begin + read_size,
146 static_cast<char*>(data) + total_read_size); 224 static_cast<char*>(data) + total_read_size);
147 total_read_size += read_size; 225 total_read_size += read_size;
148 context_->Consume(read_size); 226 context_->Consume(read_size);
149 } 227 }
150 *read_size_to_return = total_read_size; 228 *read_size_to_return = total_read_size;
151 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait; 229 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait;
152 } 230 }
153 231
154 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer, 232 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
155 Flags flags, 233 Flags flags,
156 size_t* available) { 234 size_t* available) {
157 *buffer = nullptr; 235 *buffer = nullptr;
158 *available = 0; 236 *available = 0;
159 237
238 base::AutoLock lock(context_->lock());
239
160 if (context_->result() != Ok && context_->result() != Done) 240 if (context_->result() != Ok && context_->result() != Done)
161 return context_->result(); 241 return context_->result();
162 242
163 if (context_->IsEmpty()) 243 if (context_->IsEmpty())
164 return context_->result() == Done ? Done : ShouldWait; 244 return context_->result() == Done ? Done : ShouldWait;
165 245
166 const auto& top = context_->Top(); 246 const auto& top = context_->Top();
167 *buffer = top->payload() + context_->first_offset(); 247 *buffer = top->payload() + context_->first_offset();
168 *available = top->length() - context_->first_offset(); 248 *available = top->length() - context_->first_offset();
169 249
170 return Ok; 250 return Ok;
171 } 251 }
172 252
173 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) { 253 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
254 base::AutoLock lock(context_->lock());
255
174 if (context_->IsEmpty()) 256 if (context_->IsEmpty())
175 return UnexpectedError; 257 return UnexpectedError;
176 258
177 context_->Consume(read_size); 259 context_->Consume(read_size);
178 return Ok; 260 return Ok;
179 } 261 }
180 262
181 void SharedMemoryDataConsumerHandle::registerClient(Client* client) { 263 void SharedMemoryDataConsumerHandle::registerClient(Client* client) {
182 context_->set_client(client); 264 bool needs_notification = false;
265 {
266 base::AutoLock lock(context_->lock());
183 267
184 if (!context_->IsEmpty()) 268 context_->SetClient(client);
185 client->didGetReadable(); 269 needs_notification = !context_->IsEmpty();
270 }
271 if (needs_notification)
272 context_->Notify();
186 } 273 }
187 274
188 void SharedMemoryDataConsumerHandle::unregisterClient() { 275 void SharedMemoryDataConsumerHandle::unregisterClient() {
189 context_->set_client(nullptr); 276 base::AutoLock lock(context_->lock());
277
278 context_->SetClient(nullptr);
190 } 279 }
191 280
192 } // namespace content 281 } // namespace content
OLDNEW
« no previous file with comments | « content/child/shared_memory_data_consumer_handle.h ('k') | content/child/shared_memory_data_consumer_handle_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698