Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "content/child/shared_memory_data_consumer_handle.h" | 5 #include "content/child/shared_memory_data_consumer_handle.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <deque> | 8 #include <deque> |
| 9 #include <vector> | |
| 10 | 9 |
| 11 #include "base/bind.h" | 10 #include "base/bind.h" |
| 12 #include "base/message_loop/message_loop.h" | 11 #include "base/message_loop/message_loop.h" |
| 13 #include "base/single_thread_task_runner.h" | 12 #include "base/single_thread_task_runner.h" |
| 14 #include "base/synchronization/lock.h" | 13 #include "base/synchronization/lock.h" |
| 15 #include "content/public/child/fixed_received_data.h" | 14 #include "content/public/child/fixed_received_data.h" |
| 16 | 15 |
| 17 namespace content { | 16 namespace content { |
| 18 | 17 |
| 19 namespace { | 18 namespace { |
| (...skipping 27 matching lines...) Expand all Loading... | |
| 47 | 46 |
| 48 using Result = blink::WebDataConsumerHandle::Result; | 47 using Result = blink::WebDataConsumerHandle::Result; |
| 49 | 48 |
| 50 class SharedMemoryDataConsumerHandle::Context final | 49 class SharedMemoryDataConsumerHandle::Context final |
| 51 : public base::RefCountedThreadSafe<Context> { | 50 : public base::RefCountedThreadSafe<Context> { |
| 52 public: | 51 public: |
| 53 Context() | 52 Context() |
| 54 : result_(Ok), | 53 : result_(Ok), |
| 55 first_offset_(0), | 54 first_offset_(0), |
| 56 client_(nullptr), | 55 client_(nullptr), |
| 57 is_reader_active_(true) {} | 56 is_handle_active_(true) {} |
| 58 | 57 |
| 59 bool IsEmpty() const { return queue_.empty(); } | 58 bool IsEmpty() const { return queue_.empty(); } |
| 60 void Clear() { | 59 void Clear() { |
| 61 for (auto& data : queue_) { | 60 for (auto& data : queue_) { |
| 62 delete data; | 61 delete data; |
| 63 } | 62 } |
| 64 queue_.clear(); | 63 queue_.clear(); |
| 65 first_offset_ = 0; | 64 first_offset_ = 0; |
| 66 client_ = nullptr; | 65 client_ = nullptr; |
| 67 } | 66 } |
| 68 void Notify() { | |
| 69 // Note that this function is not protected by |lock_| (actually it | |
| 70 // shouldn't be) but |notification_task_runner_| is thread-safe. | |
| 71 | |
| 72 if (notification_task_runner_->BelongsToCurrentThread()) { | |
| 73 NotifyImmediately(); | |
| 74 } else { | |
| 75 notification_task_runner_->PostTask( | |
| 76 FROM_HERE, base::Bind(&Context::NotifyImmediately, this)); | |
| 77 } | |
| 78 } | |
| 79 | |
| 80 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); } | 67 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); } |
| 81 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) { | 68 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) { |
| 82 queue_.push_back(data.release()); | 69 queue_.push_back(data.release()); |
| 83 } | 70 } |
| 84 size_t first_offset() const { return first_offset_; } | 71 size_t first_offset() const { return first_offset_; } |
| 85 Result result() const { return result_; } | 72 Result result() const { return result_; } |
| 86 void set_result(Result r) { result_ = r; } | 73 void set_result(Result r) { result_ = r; } |
| 87 Client* client() { return client_; } | 74 void AcquireReaderLock(Client* client) { |
| 88 void SetClient(Client* client) { | 75 DCHECK(!notification_task_runner_); |
| 89 if (client) { | 76 DCHECK(!client_); |
| 90 notification_task_runner_ = base::MessageLoop::current()->task_runner(); | 77 notification_task_runner_ = base::MessageLoop::current()->task_runner(); |
| 91 client_ = client; | 78 client_ = client; |
| 92 } else { | 79 if (client && !(IsEmpty() && result() == Ok)) { |
| 93 notification_task_runner_ = nullptr; | 80 // We cannot notify synchronously because the user doesn't have the reader |
| 94 client_ = nullptr; | 81 // yet. |
| 82 notification_task_runner_->PostTask( | |
| 83 FROM_HERE, base::Bind(&Context::NotifyInternal, this, false)); | |
| 95 } | 84 } |
| 96 } | 85 } |
| 97 bool is_reader_active() const { return is_reader_active_; } | 86 void ReleaseReaderLock() { |
| 98 void set_is_reader_active(bool b) { is_reader_active_ = b; } | 87 DCHECK(notification_task_runner_); |
| 88 notification_task_runner_ = nullptr; | |
| 89 client_ = nullptr; | |
| 90 } | |
| 91 void Notify() { NotifyInternal(true); } | |
| 92 bool is_handle_locked() const { return notification_task_runner_; } | |
| 93 bool IsReaderBoundToCurrentThread() const { | |
| 94 return notification_task_runner_ && | |
| 95 notification_task_runner_->BelongsToCurrentThread(); | |
| 96 } | |
| 97 bool is_handle_active() const { return is_handle_active_; } | |
| 98 void set_is_handle_active(bool b) { is_handle_active_ = b; } | |
| 99 void Consume(size_t s) { | 99 void Consume(size_t s) { |
| 100 first_offset_ += s; | 100 first_offset_ += s; |
| 101 auto top = Top(); | 101 auto top = Top(); |
| 102 if (static_cast<size_t>(top->length()) <= first_offset_) { | 102 if (static_cast<size_t>(top->length()) <= first_offset_) { |
| 103 delete top; | 103 delete top; |
| 104 queue_.pop_front(); | 104 queue_.pop_front(); |
| 105 first_offset_ = 0; | 105 first_offset_ = 0; |
| 106 } | 106 } |
| 107 } | 107 } |
| 108 base::Lock& lock() { return lock_; } | 108 base::Lock& lock() { return lock_; } |
| 109 | 109 |
| 110 private: | 110 private: |
| 111 void NotifyInternal(bool repost) { | |
| 112 // Note that this function is not protected by |lock_|. | |
| 113 | |
| 114 auto runner = notification_task_runner_; | |
| 115 if (!runner) { | |
| 116 // Do nothing. | |
| 117 } else if (runner->BelongsToCurrentThread()) { | |
| 118 // It is safe to access member variables without lock because |client_| | |
| 119 // is bound to the current thread. | |
| 120 if (client_) | |
| 121 client_->didGetReadable(); | |
| 122 } else if (repost) { | |
| 123 // We don't re-post the task when the runner changes while waiting for | |
| 124 // this task. | |
| 125 runner->PostTask(FROM_HERE, | |
| 126 base::Bind(&Context::NotifyInternal, this, false)); | |
| 127 } | |
| 128 } | |
| 129 | |
| 111 friend class base::RefCountedThreadSafe<Context>; | 130 friend class base::RefCountedThreadSafe<Context>; |
| 112 ~Context() { | 131 ~Context() { |
| 113 // This is necessary because the queue stores raw pointers. | 132 // This is necessary because the queue stores raw pointers. |
| 114 Clear(); | 133 Clear(); |
| 115 } | 134 } |
| 116 | 135 |
| 117 void NotifyImmediately() { | |
| 118 // As we can assume that all reader-side methods are called on this | |
| 119 // thread (see WebDataConsumerHandle comments), we don't need to lock. | |
| 120 if (client_) | |
| 121 client_->didGetReadable(); | |
| 122 } | |
| 123 | |
| 124 base::Lock lock_; | 136 base::Lock lock_; |
| 125 // |result_| stores the ultimate state of this handle if it has. Otherwise, | 137 // |result_| stores the ultimate state of this handle if it has. Otherwise, |
| 126 // |Ok| is set. | 138 // |Ok| is set. |
| 127 Result result_; | 139 Result result_; |
| 128 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>> | 140 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>> |
| 129 // once it is allowed. | 141 // once it is allowed. |
| 130 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_; | 142 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_; |
| 131 size_t first_offset_; | 143 size_t first_offset_; |
| 132 Client* client_; | 144 Client* client_; |
| 133 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_; | 145 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_; |
| 134 bool is_reader_active_; | 146 bool is_handle_active_; |
| 135 | 147 |
| 136 DISALLOW_COPY_AND_ASSIGN(Context); | 148 DISALLOW_COPY_AND_ASSIGN(Context); |
| 137 }; | 149 }; |
| 138 | 150 |
| 139 SharedMemoryDataConsumerHandle::Writer::Writer( | 151 SharedMemoryDataConsumerHandle::Writer::Writer( |
| 140 const scoped_refptr<Context>& context, | 152 const scoped_refptr<Context>& context, |
| 141 BackpressureMode mode) | 153 BackpressureMode mode) |
| 142 : context_(context), mode_(mode) { | 154 : context_(context), mode_(mode) { |
| 143 } | 155 } |
| 144 | 156 |
| 145 SharedMemoryDataConsumerHandle::Writer::~Writer() { | 157 SharedMemoryDataConsumerHandle::Writer::~Writer() { |
| 146 Close(); | 158 Close(); |
| 147 } | 159 } |
| 148 | 160 |
| 149 void SharedMemoryDataConsumerHandle::Writer::AddData( | 161 void SharedMemoryDataConsumerHandle::Writer::AddData( |
| 150 scoped_ptr<RequestPeer::ReceivedData> data) { | 162 scoped_ptr<RequestPeer::ReceivedData> data) { |
| 151 if (!data->length()) { | 163 if (!data->length()) { |
| 152 // We omit empty data. | 164 // We omit empty data. |
| 153 return; | 165 return; |
| 154 } | 166 } |
| 155 | 167 |
| 156 bool needs_notification = false; | 168 bool needs_notification = false; |
| 157 { | 169 { |
| 158 base::AutoLock lock(context_->lock()); | 170 base::AutoLock lock(context_->lock()); |
| 159 if (!context_->is_reader_active()) { | 171 if (!context_->is_handle_active() && !context_->is_handle_locked()) { |
| 160 // No one is interested in the data. | 172 // No one is interested in the data. |
| 161 return; | 173 return; |
| 162 } | 174 } |
| 163 | 175 |
| 164 needs_notification = context_->client() && context_->IsEmpty(); | 176 needs_notification = context_->IsEmpty(); |
| 165 scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass; | 177 scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass; |
| 166 if (mode_ == kApplyBackpressure) { | 178 if (mode_ == kApplyBackpressure) { |
| 167 data_to_pass = | 179 data_to_pass = |
| 168 make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass())); | 180 make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass())); |
| 169 } else { | 181 } else { |
| 170 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get())); | 182 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get())); |
| 171 } | 183 } |
| 172 context_->Push(data_to_pass.Pass()); | 184 context_->Push(data_to_pass.Pass()); |
| 173 } | 185 } |
| 174 | 186 |
| 175 if (needs_notification) | 187 if (needs_notification) |
| 176 context_->Notify(); | 188 context_->Notify(); |
| 177 } | 189 } |
| 178 | 190 |
| 179 void SharedMemoryDataConsumerHandle::Writer::Close() { | 191 void SharedMemoryDataConsumerHandle::Writer::Close() { |
| 180 bool needs_notification = false; | 192 bool needs_notification = false; |
| 181 | 193 |
| 182 { | 194 { |
| 183 base::AutoLock lock(context_->lock()); | 195 base::AutoLock lock(context_->lock()); |
| 184 if (context_->result() == Ok) { | 196 if (context_->result() == Ok) { |
| 185 context_->set_result(Done); | 197 context_->set_result(Done); |
| 186 needs_notification = context_->client() && context_->IsEmpty(); | 198 needs_notification = context_->IsEmpty(); |
| 187 } | 199 } |
| 188 } | 200 } |
| 189 if (needs_notification) | 201 if (needs_notification) |
| 190 context_->Notify(); | 202 context_->Notify(); |
| 191 } | 203 } |
| 192 | 204 |
| 193 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( | 205 SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl( |
| 194 BackpressureMode mode, | 206 scoped_refptr<Context> context, |
| 195 scoped_ptr<Writer>* writer) | 207 Client* client) |
| 196 : context_(new Context) { | 208 : context_(context) { |
| 197 writer->reset(new Writer(context_, mode)); | 209 DCHECK(!context_->is_handle_locked()); |
| 210 context_->AcquireReaderLock(client); | |
| 198 } | 211 } |
| 199 | 212 |
| 200 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { | 213 SharedMemoryDataConsumerHandle::ReaderImpl::~ReaderImpl() { |
| 201 base::AutoLock lock(context_->lock()); | 214 context_->ReleaseReaderLock(); |
| 202 context_->set_is_reader_active(false); | |
| 203 context_->Clear(); | |
| 204 } | 215 } |
| 205 | 216 |
| 206 Result SharedMemoryDataConsumerHandle::read(void* data, | 217 Result SharedMemoryDataConsumerHandle::ReaderImpl::read( |
| 207 size_t size, | 218 void* data, |
| 208 Flags flags, | 219 size_t size, |
| 209 size_t* read_size_to_return) { | 220 Flags flags, |
| 221 size_t* read_size_to_return) { | |
| 210 base::AutoLock lock(context_->lock()); | 222 base::AutoLock lock(context_->lock()); |
| 211 | 223 |
| 212 size_t total_read_size = 0; | 224 size_t total_read_size = 0; |
| 213 *read_size_to_return = 0; | 225 *read_size_to_return = 0; |
| 214 if (context_->result() != Ok && context_->result() != Done) | 226 if (context_->result() != Ok && context_->result() != Done) |
| 215 return context_->result(); | 227 return context_->result(); |
| 216 | 228 |
| 217 while (!context_->IsEmpty() && total_read_size < size) { | 229 while (!context_->IsEmpty() && total_read_size < size) { |
| 218 const auto& top = context_->Top(); | 230 const auto& top = context_->Top(); |
| 219 size_t readable = top->length() - context_->first_offset(); | 231 size_t readable = top->length() - context_->first_offset(); |
| 220 size_t writable = size - total_read_size; | 232 size_t writable = size - total_read_size; |
| 221 size_t read_size = std::min(readable, writable); | 233 size_t read_size = std::min(readable, writable); |
| 222 const char* begin = top->payload() + context_->first_offset(); | 234 const char* begin = top->payload() + context_->first_offset(); |
| 223 std::copy(begin, begin + read_size, | 235 std::copy(begin, begin + read_size, |
| 224 static_cast<char*>(data) + total_read_size); | 236 static_cast<char*>(data) + total_read_size); |
| 225 total_read_size += read_size; | 237 total_read_size += read_size; |
| 226 context_->Consume(read_size); | 238 context_->Consume(read_size); |
| 227 } | 239 } |
| 228 *read_size_to_return = total_read_size; | 240 *read_size_to_return = total_read_size; |
| 229 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait; | 241 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait; |
| 230 } | 242 } |
| 231 | 243 |
| 232 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer, | 244 Result SharedMemoryDataConsumerHandle::ReaderImpl::beginRead( |
| 233 Flags flags, | 245 const void** buffer, |
| 234 size_t* available) { | 246 Flags flags, |
| 247 size_t* available) { | |
| 235 *buffer = nullptr; | 248 *buffer = nullptr; |
| 236 *available = 0; | 249 *available = 0; |
| 237 | 250 |
| 238 base::AutoLock lock(context_->lock()); | 251 base::AutoLock lock(context_->lock()); |
| 239 | 252 |
| 240 if (context_->result() != Ok && context_->result() != Done) | 253 if (context_->result() != Ok && context_->result() != Done) |
| 241 return context_->result(); | 254 return context_->result(); |
| 242 | 255 |
| 243 if (context_->IsEmpty()) | 256 if (context_->IsEmpty()) |
| 244 return context_->result() == Done ? Done : ShouldWait; | 257 return context_->result() == Done ? Done : ShouldWait; |
| 245 | 258 |
| 246 const auto& top = context_->Top(); | 259 const auto& top = context_->Top(); |
| 247 *buffer = top->payload() + context_->first_offset(); | 260 *buffer = top->payload() + context_->first_offset(); |
| 248 *available = top->length() - context_->first_offset(); | 261 *available = top->length() - context_->first_offset(); |
| 249 | 262 |
| 250 return Ok; | 263 return Ok; |
| 251 } | 264 } |
| 252 | 265 |
| 253 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) { | 266 Result SharedMemoryDataConsumerHandle::ReaderImpl::endRead(size_t read_size) { |
| 254 base::AutoLock lock(context_->lock()); | 267 base::AutoLock lock(context_->lock()); |
| 255 | 268 |
| 256 if (context_->IsEmpty()) | 269 if (context_->IsEmpty()) |
| 257 return UnexpectedError; | 270 return UnexpectedError; |
| 258 | 271 |
| 259 context_->Consume(read_size); | 272 context_->Consume(read_size); |
| 260 return Ok; | 273 return Ok; |
| 261 } | 274 } |
| 262 | 275 |
| 276 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( | |
| 277 BackpressureMode mode, | |
| 278 scoped_ptr<Writer>* writer) | |
| 279 : context_(new Context) { | |
| 280 writer->reset(new Writer(context_, mode)); | |
| 281 } | |
| 282 | |
| 283 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { | |
| 284 base::AutoLock lock(context_->lock()); | |
| 285 context_->set_is_handle_active(false); | |
| 286 context_->Clear(); | |
|
hiroshige
2015/06/11 00:03:30
Do not call Clear() here because a reader might be
yhirano
2015/06/11 07:24:06
Thanks, done.
| |
| 287 } | |
| 288 | |
| 289 SharedMemoryDataConsumerHandle::ReaderImpl* | |
| 290 SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) { | |
| 291 base::AutoLock lock(context_->lock()); | |
| 292 scoped_ptr<ReaderImpl> reader; | |
| 293 | |
| 294 reader.reset(new ReaderImpl(context_, client)); | |
| 295 return reader.release(); | |
| 296 } | |
| 297 | |
| 298 Result SharedMemoryDataConsumerHandle::read(void* data, | |
| 299 size_t size, | |
| 300 Flags flags, | |
| 301 size_t* read_size_to_return) { | |
| 302 // Note this (and below similar functions) is a bit racy. We don't care about | |
| 303 // it because this is a deprecated function and will be removed shortly. | |
| 304 LockImplicitly(); | |
| 305 return reader_->read(data, size, flags, read_size_to_return); | |
| 306 } | |
| 307 | |
| 308 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer, | |
| 309 Flags flags, | |
| 310 size_t* available) { | |
| 311 LockImplicitly(); | |
| 312 return reader_->beginRead(buffer, flags, available); | |
| 313 } | |
| 314 | |
| 315 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) { | |
| 316 LockImplicitly(); | |
| 317 return reader_->endRead(read_size); | |
| 318 } | |
| 319 | |
| 263 void SharedMemoryDataConsumerHandle::registerClient(Client* client) { | 320 void SharedMemoryDataConsumerHandle::registerClient(Client* client) { |
| 264 bool needs_notification = false; | 321 UnlockImplicitly(); |
| 265 { | 322 reader_ = obtainReader(client); |
| 266 base::AutoLock lock(context_->lock()); | |
| 267 | |
| 268 context_->SetClient(client); | |
| 269 needs_notification = !context_->IsEmpty(); | |
| 270 } | |
| 271 if (needs_notification) | |
| 272 context_->Notify(); | |
| 273 } | 323 } |
| 274 | 324 |
| 275 void SharedMemoryDataConsumerHandle::unregisterClient() { | 325 void SharedMemoryDataConsumerHandle::unregisterClient() { |
| 276 base::AutoLock lock(context_->lock()); | 326 reader_.reset(); |
| 327 } | |
| 277 | 328 |
| 278 context_->SetClient(nullptr); | 329 void SharedMemoryDataConsumerHandle::LockImplicitly() { |
| 330 { | |
| 331 base::AutoLock lock(context_->lock()); | |
| 332 if (reader_) { | |
| 333 DCHECK(context_->IsReaderBoundToCurrentThread()); | |
| 334 return; | |
| 335 } | |
| 336 } | |
| 337 reader_ = obtainReader(nullptr); | |
| 338 } | |
| 339 | |
| 340 void SharedMemoryDataConsumerHandle::UnlockImplicitly() { | |
| 341 bool needs_unlock = false; | |
| 342 { | |
| 343 base::AutoLock lock(context_->lock()); | |
| 344 if (reader_) { | |
| 345 DCHECK(context_->IsReaderBoundToCurrentThread()); | |
| 346 needs_unlock = true; | |
| 347 } | |
| 348 } | |
| 349 if (needs_unlock) { | |
| 350 reader_.reset(); | |
| 351 } | |
| 279 } | 352 } |
| 280 | 353 |
| 281 } // namespace content | 354 } // namespace content |
| OLD | NEW |