Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(531)

Side by Side Diff: content/child/shared_memory_data_consumer_handle.cc

Issue 1144033002: Make SharedMemoryDataConsumerHandle thread-safe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@ipc-data-consumer
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/child/shared_memory_data_consumer_handle.h" 5 #include "content/child/shared_memory_data_consumer_handle.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <deque> 8 #include <deque>
9 #include <vector> 9 #include <vector>
10 10
11 #include "base/bind.h"
12 #include "base/message_loop/message_loop.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/synchronization/lock.h"
15
11 namespace content { 16 namespace content {
12 17
13 namespace { 18 namespace {
14 19
15 class FixedReceivedData final : public RequestPeer::ReceivedData { 20 // A ThreadSafeReceivedData can be deleted on ANY thread.
21 class ThreadSafeReceivedData : public RequestPeer::ReceivedData {};
22
23 class FixedReceivedData final : public ThreadSafeReceivedData {
16 public: 24 public:
17 explicit FixedReceivedData(RequestPeer::ReceivedData* data) 25 explicit FixedReceivedData(RequestPeer::ReceivedData* data)
18 : data_(data->payload(), data->payload() + data->length()), 26 : data_(data->payload(), data->payload() + data->length()),
19 encoded_length_(data->encoded_length()) {} 27 encoded_length_(data->encoded_length()) {}
20 28
21 const char* payload() const override { 29 const char* payload() const override {
22 return data_.empty() ? nullptr : &data_[0]; 30 return data_.empty() ? nullptr : &data_[0];
23 } 31 }
24 int length() const override { return static_cast<int>(data_.size()); } 32 int length() const override { return static_cast<int>(data_.size()); }
25 int encoded_length() const override { return encoded_length_; } 33 int encoded_length() const override { return encoded_length_; }
26 34
27 private: 35 private:
28 const std::vector<char> data_; 36 const std::vector<char> data_;
29 const int encoded_length_; 37 const int encoded_length_;
30 38
31 DISALLOW_COPY_AND_ASSIGN(FixedReceivedData); 39 DISALLOW_COPY_AND_ASSIGN(FixedReceivedData);
32 }; 40 };
33 41
42 class DelegateThreadSafeReceivedData final : public ThreadSafeReceivedData {
43 public:
44 explicit DelegateThreadSafeReceivedData(
45 scoped_ptr<RequestPeer::ReceivedData> data)
46 : data_(data.Pass()),
47 task_runner_(base::MessageLoop::current()->task_runner()) {}
48 ~DelegateThreadSafeReceivedData() override {
49 if (!task_runner_->BelongsToCurrentThread()) {
50 // Delete the data on the original thread.
51 task_runner_->DeleteSoon(FROM_HERE, data_.release());
52 }
53 }
54
55 const char* payload() const override { return data_->payload(); }
56 int length() const override { return data_->length(); }
57 int encoded_length() const override { return data_->encoded_length(); }
58
59 private:
60 scoped_ptr<RequestPeer::ReceivedData> data_;
61 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
62
63 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData);
64 };
65
34 } // namespace 66 } // namespace
35 67
36 using Result = blink::WebDataConsumerHandle::Result; 68 using Result = blink::WebDataConsumerHandle::Result;
37 using Data = RequestPeer::ReceivedData;
38 69
39 class SharedMemoryDataConsumerHandle::Context final 70 class SharedMemoryDataConsumerHandle::Context final
40 : public base::RefCountedThreadSafe<Context> { 71 : public base::RefCountedThreadSafe<Context> {
41 public: 72 public:
42 Context() 73 Context()
43 : result_(Ok), 74 : result_(Ok),
44 first_offset_(0), 75 first_offset_(0),
45 client_(nullptr), 76 client_(nullptr),
46 is_reader_active_(true) {} 77 is_reader_active_(true) {}
47 78
48 bool IsEmpty() const { return queue_.empty(); } 79 bool IsEmpty() const { return queue_.empty(); }
49 void Clear() { 80 void Clear() {
50 for (Data* data : queue_) { 81 for (auto& data : queue_) {
51 delete data; 82 delete data;
52 } 83 }
53 queue_.clear(); 84 queue_.clear();
54 first_offset_ = 0; 85 first_offset_ = 0;
55 client_ = nullptr; 86 client_ = nullptr;
56 } 87 }
57 void PopReadData() { 88 void PopReadData() {
58 Data* top = Top(); 89 auto top = Top();
59 if (static_cast<size_t>(top->length()) <= first_offset_) { 90 if (static_cast<size_t>(top->length()) <= first_offset_) {
60 delete top; 91 delete top;
61 queue_.pop_front(); 92 queue_.pop_front();
62 first_offset_ = 0; 93 first_offset_ = 0;
63 } 94 }
64 } 95 }
65 Data* Top() { return queue_.front(); } 96 void Notify() {
66 void Push(scoped_ptr<Data> data) { queue_.push_back(data.release()); } 97 // Note that this function is not protected by |lock_| (actually it
98 // shouldn't be) but |notification_task_runner_| is thread-safe.
99
100 if (notification_task_runner_->BelongsToCurrentThread()) {
101 NotifyImmediately();
102 } else {
103 notification_task_runner_->PostTask(
104 FROM_HERE, base::Bind(&Context::NotifyImmediately, this));
105 }
106 }
107
108 ThreadSafeReceivedData* Top() { return queue_.front(); }
109 void Push(scoped_ptr<ThreadSafeReceivedData> data) {
110 queue_.push_back(data.release());
111 }
67 size_t first_offset() const { return first_offset_; } 112 size_t first_offset() const { return first_offset_; }
68 void AdvanceFirstOffset(size_t s) { first_offset_ += s; } 113 void AdvanceFirstOffset(size_t s) { first_offset_ += s; }
69 Result result() const { return result_; } 114 Result result() const { return result_; }
70 void set_result(Result r) { result_ = r; } 115 void set_result(Result r) { result_ = r; }
71 Client* client() { return client_; } 116 Client* client() { return client_; }
72 void set_client(Client* client) { client_ = client; } 117 void SetClient(Client* client) {
118 if (client) {
119 notification_task_runner_ = base::MessageLoop::current()->task_runner();
120 client_ = client;
121 } else {
122 notification_task_runner_ = nullptr;
123 client_ = nullptr;
124 }
125 }
73 bool is_reader_active() const { return is_reader_active_; } 126 bool is_reader_active() const { return is_reader_active_; }
74 void set_is_reader_active(bool b) { is_reader_active_ = b; } 127 void set_is_reader_active(bool b) { is_reader_active_ = b; }
128 base::Lock& lock() { return lock_; }
75 129
76 private: 130 private:
77 friend class base::RefCountedThreadSafe<Context>; 131 friend class base::RefCountedThreadSafe<Context>;
78 ~Context() { 132 ~Context() {
79 // This is necessary because the queue stores raw pointers. 133 // This is necessary because the queue stores raw pointers.
80 Clear(); 134 Clear();
81 } 135 }
82 136
137 void NotifyImmediately() {
138 // As we can assume that all reader-side methods are called on this
139 // thread (see WebDataConsumerHandle comments), we don't need to lock.
140 if (client_)
141 client_->didGetReadable();
142 }
143
144 base::Lock lock_;
83 // |result_| stores the ultimate state of this handle if it has. Otherwise, 145 // |result_| stores the ultimate state of this handle if it has. Otherwise,
84 // |Ok| is set. 146 // |Ok| is set.
85 Result result_; 147 Result result_;
86 // TODO(yhirano): Use std::deque<scoped_ptr<ReceivedData>> once it is allowed. 148 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>>
87 std::deque<RequestPeer::ReceivedData*> queue_; 149 // once it is allowed.
150 std::deque<ThreadSafeReceivedData*> queue_;
88 size_t first_offset_; 151 size_t first_offset_;
89 Client* client_; 152 Client* client_;
153 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_;
90 bool is_reader_active_; 154 bool is_reader_active_;
91 155
92 DISALLOW_COPY_AND_ASSIGN(Context); 156 DISALLOW_COPY_AND_ASSIGN(Context);
93 }; 157 };
94 158
95 SharedMemoryDataConsumerHandle::Writer::Writer( 159 SharedMemoryDataConsumerHandle::Writer::Writer(
96 const scoped_refptr<Context>& context, 160 const scoped_refptr<Context>& context,
97 BackpressureMode mode) 161 BackpressureMode mode)
98 : context_(context), mode_(mode) { 162 : context_(context), mode_(mode) {
99 } 163 }
100 164
101 SharedMemoryDataConsumerHandle::Writer::~Writer() { 165 SharedMemoryDataConsumerHandle::Writer::~Writer() {
102 Close(); 166 Close();
103 } 167 }
104 168
105 void SharedMemoryDataConsumerHandle::Writer::AddData(scoped_ptr<Data> data) { 169 void SharedMemoryDataConsumerHandle::Writer::AddData(
170 scoped_ptr<RequestPeer::ReceivedData> data) {
106 if (!data->length()) { 171 if (!data->length()) {
107 // We omit empty data. 172 // We omit empty data.
108 return; 173 return;
109 } 174 }
110 175
111 if (!context_->is_reader_active()) { 176 bool needs_notification = false;
112 // No one is interested in the data. 177 {
113 return; 178 base::AutoLock lock(context_->lock());
179 if (!context_->is_reader_active()) {
180 // No one is interested in the data.
181 return;
182 }
183
184 needs_notification = context_->client() && context_->IsEmpty();
185 scoped_ptr<ThreadSafeReceivedData> data_to_pass;
186 if (mode_ == kApplyBackpressure) {
187 data_to_pass =
188 make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass()));
189 } else {
190 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
191 }
192 context_->Push(data_to_pass.Pass());
114 } 193 }
115 194
116 bool needs_notification = context_->client() && context_->IsEmpty();
117 scoped_ptr<RequestPeer::ReceivedData> data_to_pass;
118 if (mode_ == kApplyBackpressure) {
119 data_to_pass = data.Pass();
120 } else {
121 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
122 }
123 context_->Push(data_to_pass.Pass());
124
125 if (needs_notification) 195 if (needs_notification)
126 context_->client()->didGetReadable(); 196 context_->Notify();
127 } 197 }
128 198
129 void SharedMemoryDataConsumerHandle::Writer::Close() { 199 void SharedMemoryDataConsumerHandle::Writer::Close() {
130 if (context_->result() == Ok) { 200 bool needs_notification = false;
131 context_->set_result(Done); 201
132 if (context_->client() && context_->IsEmpty()) 202 {
133 context_->client()->didGetReadable(); 203 base::AutoLock lock(context_->lock());
204 if (context_->result() == Ok) {
205 context_->set_result(Done);
206 needs_notification = context_->client() && context_->IsEmpty();
207 }
134 } 208 }
209 if (needs_notification)
210 context_->Notify();
135 } 211 }
136 212
137 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( 213 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
138 BackpressureMode mode, 214 BackpressureMode mode,
139 scoped_ptr<Writer>* writer) 215 scoped_ptr<Writer>* writer)
140 : context_(new Context) { 216 : context_(new Context) {
141 writer->reset(new Writer(context_, mode)); 217 writer->reset(new Writer(context_, mode));
142 } 218 }
143 219
144 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { 220 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
221 base::AutoLock lock(context_->lock());
145 context_->set_is_reader_active(false); 222 context_->set_is_reader_active(false);
146 context_->Clear(); 223 context_->Clear();
147 } 224 }
148 225
149 Result SharedMemoryDataConsumerHandle::read(void* data, 226 Result SharedMemoryDataConsumerHandle::read(void* data,
150 size_t size, 227 size_t size,
151 Flags flags, 228 Flags flags,
152 size_t* read_size_to_return) { 229 size_t* read_size_to_return) {
230 base::AutoLock lock(context_->lock());
231
153 size_t total_read_size = 0; 232 size_t total_read_size = 0;
154 *read_size_to_return = 0; 233 *read_size_to_return = 0;
155 if (context_->result() != Ok && context_->result() != Done) 234 if (context_->result() != Ok && context_->result() != Done)
156 return context_->result(); 235 return context_->result();
157 236
158 while (!context_->IsEmpty() && total_read_size < size) { 237 while (!context_->IsEmpty() && total_read_size < size) {
159 const auto& top = context_->Top(); 238 const auto& top = context_->Top();
160 size_t readable = top->length() - context_->first_offset(); 239 size_t readable = top->length() - context_->first_offset();
161 size_t writable = size - total_read_size; 240 size_t writable = size - total_read_size;
162 size_t read_size = std::min(readable, writable); 241 size_t read_size = std::min(readable, writable);
163 const char* begin = top->payload() + context_->first_offset(); 242 const char* begin = top->payload() + context_->first_offset();
164 std::copy(begin, begin + read_size, 243 std::copy(begin, begin + read_size,
165 static_cast<char*>(data) + total_read_size); 244 static_cast<char*>(data) + total_read_size);
166 total_read_size += read_size; 245 total_read_size += read_size;
167 context_->AdvanceFirstOffset(read_size); 246 context_->AdvanceFirstOffset(read_size);
168 context_->PopReadData(); 247 context_->PopReadData();
169 } 248 }
170 *read_size_to_return = total_read_size; 249 *read_size_to_return = total_read_size;
171 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait; 250 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait;
172 } 251 }
173 252
174 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer, 253 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
175 Flags flags, 254 Flags flags,
176 size_t* available) { 255 size_t* available) {
177 *buffer = nullptr; 256 *buffer = nullptr;
178 *available = 0; 257 *available = 0;
179 258
259 base::AutoLock lock(context_->lock());
260
180 if (context_->result() != Ok && context_->result() != Done) 261 if (context_->result() != Ok && context_->result() != Done)
181 return context_->result(); 262 return context_->result();
182 263
183 if (context_->IsEmpty()) 264 if (context_->IsEmpty())
184 return context_->result() == Done ? Done : ShouldWait; 265 return context_->result() == Done ? Done : ShouldWait;
185 266
186 const auto& top = context_->Top(); 267 const auto& top = context_->Top();
187 *buffer = top->payload() + context_->first_offset(); 268 *buffer = top->payload() + context_->first_offset();
188 *available = top->length() - context_->first_offset(); 269 *available = top->length() - context_->first_offset();
189 270
190 return Ok; 271 return Ok;
191 } 272 }
192 273
193 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) { 274 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
275 base::AutoLock lock(context_->lock());
276
194 if (context_->IsEmpty()) 277 if (context_->IsEmpty())
195 return UnexpectedError; 278 return UnexpectedError;
196 279
197 context_->AdvanceFirstOffset(read_size); 280 context_->AdvanceFirstOffset(read_size);
198 context_->PopReadData(); 281 context_->PopReadData();
199 return Ok; 282 return Ok;
200 } 283 }
201 284
202 void SharedMemoryDataConsumerHandle::registerClient(Client* client) { 285 void SharedMemoryDataConsumerHandle::registerClient(Client* client) {
203 context_->set_client(client); 286 bool needs_notification = false;
287 {
288 base::AutoLock lock(context_->lock());
204 289
205 if (!context_->IsEmpty()) 290 context_->SetClient(client);
206 client->didGetReadable(); 291 needs_notification = !context_->IsEmpty();
292 }
293 if (needs_notification)
294 context_->Notify();
207 } 295 }
208 296
209 void SharedMemoryDataConsumerHandle::unregisterClient() { 297 void SharedMemoryDataConsumerHandle::unregisterClient() {
210 context_->set_client(nullptr); 298 base::AutoLock lock(context_->lock());
299
300 context_->SetClient(nullptr);
211 } 301 }
212 302
213 } // namespace content 303 } // namespace content
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698