| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "content/child/shared_memory_data_consumer_handle.h" | 5 #include "content/child/shared_memory_data_consumer_handle.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <deque> | 8 #include <deque> |
| 9 #include <vector> | 9 #include <vector> |
| 10 | 10 |
| 11 #include "base/bind.h" |
| 12 #include "base/message_loop/message_loop.h" |
| 13 #include "base/single_thread_task_runner.h" |
| 14 #include "base/synchronization/lock.h" |
| 11 #include "content/public/child/fixed_received_data.h" | 15 #include "content/public/child/fixed_received_data.h" |
| 12 | 16 |
| 13 namespace content { | 17 namespace content { |
| 14 | 18 |
| 19 namespace { |
| 20 |
| 21 class DelegateThreadSafeReceivedData final |
| 22 : public RequestPeer::ThreadSafeReceivedData { |
| 23 public: |
| 24 explicit DelegateThreadSafeReceivedData( |
| 25 scoped_ptr<RequestPeer::ReceivedData> data) |
| 26 : data_(data.Pass()), |
| 27 task_runner_(base::MessageLoop::current()->task_runner()) {} |
| 28 ~DelegateThreadSafeReceivedData() override { |
| 29 if (!task_runner_->BelongsToCurrentThread()) { |
| 30 // Delete the data on the original thread. |
| 31 task_runner_->DeleteSoon(FROM_HERE, data_.release()); |
| 32 } |
| 33 } |
| 34 |
| 35 const char* payload() const override { return data_->payload(); } |
| 36 int length() const override { return data_->length(); } |
| 37 int encoded_length() const override { return data_->encoded_length(); } |
| 38 |
| 39 private: |
| 40 scoped_ptr<RequestPeer::ReceivedData> data_; |
| 41 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| 42 |
| 43 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData); |
| 44 }; |
| 45 |
| 46 } // namespace |
| 47 |
| 15 using Result = blink::WebDataConsumerHandle::Result; | 48 using Result = blink::WebDataConsumerHandle::Result; |
| 16 | 49 |
| 17 class SharedMemoryDataConsumerHandle::Context final | 50 class SharedMemoryDataConsumerHandle::Context final |
| 18 : public base::RefCountedThreadSafe<Context> { | 51 : public base::RefCountedThreadSafe<Context> { |
| 19 public: | 52 public: |
| 20 Context() | 53 Context() |
| 21 : result_(Ok), | 54 : result_(Ok), |
| 22 first_offset_(0), | 55 first_offset_(0), |
| 23 client_(nullptr), | 56 client_(nullptr), |
| 24 is_reader_active_(true) {} | 57 is_reader_active_(true) {} |
| 25 | 58 |
| 26 bool IsEmpty() const { return queue_.empty(); } | 59 bool IsEmpty() const { return queue_.empty(); } |
| 27 void Clear() { | 60 void Clear() { |
| 28 for (RequestPeer::ReceivedData* data : queue_) { | 61 for (auto& data : queue_) { |
| 29 delete data; | 62 delete data; |
| 30 } | 63 } |
| 31 queue_.clear(); | 64 queue_.clear(); |
| 32 first_offset_ = 0; | 65 first_offset_ = 0; |
| 33 client_ = nullptr; | 66 client_ = nullptr; |
| 34 } | 67 } |
| 35 RequestPeer::ReceivedData* Top() { return queue_.front(); } | 68 void Notify() { |
| 36 void Push(scoped_ptr<RequestPeer::ReceivedData> data) { | 69 // Note that this function is not protected by |lock_| (actually it |
| 70 // shouldn't be) but |notification_task_runner_| is thread-safe. |
| 71 |
| 72 if (notification_task_runner_->BelongsToCurrentThread()) { |
| 73 NotifyImmediately(); |
| 74 } else { |
| 75 notification_task_runner_->PostTask( |
| 76 FROM_HERE, base::Bind(&Context::NotifyImmediately, this)); |
| 77 } |
| 78 } |
| 79 |
| 80 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); } |
| 81 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) { |
| 37 queue_.push_back(data.release()); | 82 queue_.push_back(data.release()); |
| 38 } | 83 } |
| 39 size_t first_offset() const { return first_offset_; } | 84 size_t first_offset() const { return first_offset_; } |
| 40 Result result() const { return result_; } | 85 Result result() const { return result_; } |
| 41 void set_result(Result r) { result_ = r; } | 86 void set_result(Result r) { result_ = r; } |
| 42 Client* client() { return client_; } | 87 Client* client() { return client_; } |
| 43 void set_client(Client* client) { client_ = client; } | 88 void SetClient(Client* client) { |
| 89 if (client) { |
| 90 notification_task_runner_ = base::MessageLoop::current()->task_runner(); |
| 91 client_ = client; |
| 92 } else { |
| 93 notification_task_runner_ = nullptr; |
| 94 client_ = nullptr; |
| 95 } |
| 96 } |
| 44 bool is_reader_active() const { return is_reader_active_; } | 97 bool is_reader_active() const { return is_reader_active_; } |
| 45 void set_is_reader_active(bool b) { is_reader_active_ = b; } | 98 void set_is_reader_active(bool b) { is_reader_active_ = b; } |
| 46 void Consume(size_t s) { | 99 void Consume(size_t s) { |
| 47 first_offset_ += s; | 100 first_offset_ += s; |
| 48 RequestPeer::ReceivedData* top = Top(); | 101 auto top = Top(); |
| 49 if (static_cast<size_t>(top->length()) <= first_offset_) { | 102 if (static_cast<size_t>(top->length()) <= first_offset_) { |
| 50 delete top; | 103 delete top; |
| 51 queue_.pop_front(); | 104 queue_.pop_front(); |
| 52 first_offset_ = 0; | 105 first_offset_ = 0; |
| 53 } | 106 } |
| 54 } | 107 } |
| 108 base::Lock& lock() { return lock_; } |
| 55 | 109 |
| 56 private: | 110 private: |
| 57 friend class base::RefCountedThreadSafe<Context>; | 111 friend class base::RefCountedThreadSafe<Context>; |
| 58 ~Context() { | 112 ~Context() { |
| 59 // This is necessary because the queue stores raw pointers. | 113 // This is necessary because the queue stores raw pointers. |
| 60 Clear(); | 114 Clear(); |
| 61 } | 115 } |
| 62 | 116 |
| 117 void NotifyImmediately() { |
| 118 // As we can assume that all reader-side methods are called on this |
| 119 // thread (see WebDataConsumerHandle comments), we don't need to lock. |
| 120 if (client_) |
| 121 client_->didGetReadable(); |
| 122 } |
| 123 |
| 124 base::Lock lock_; |
| 63 // |result_| stores the ultimate state of this handle if it has. Otherwise, | 125 // |result_| stores the ultimate state of this handle if it has. Otherwise, |
| 64 // |Ok| is set. | 126 // |Ok| is set. |
| 65 Result result_; | 127 Result result_; |
| 66 // TODO(yhirano): Use std::deque<scoped_ptr<ReceivedData>> once it is allowed. | 128 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>> |
| 67 std::deque<RequestPeer::ReceivedData*> queue_; | 129 // once it is allowed. |
| 130 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_; |
| 68 size_t first_offset_; | 131 size_t first_offset_; |
| 69 Client* client_; | 132 Client* client_; |
| 133 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_; |
| 70 bool is_reader_active_; | 134 bool is_reader_active_; |
| 71 | 135 |
| 72 DISALLOW_COPY_AND_ASSIGN(Context); | 136 DISALLOW_COPY_AND_ASSIGN(Context); |
| 73 }; | 137 }; |
| 74 | 138 |
| 75 SharedMemoryDataConsumerHandle::Writer::Writer( | 139 SharedMemoryDataConsumerHandle::Writer::Writer( |
| 76 const scoped_refptr<Context>& context, | 140 const scoped_refptr<Context>& context, |
| 77 BackpressureMode mode) | 141 BackpressureMode mode) |
| 78 : context_(context), mode_(mode) { | 142 : context_(context), mode_(mode) { |
| 79 } | 143 } |
| 80 | 144 |
| 81 SharedMemoryDataConsumerHandle::Writer::~Writer() { | 145 SharedMemoryDataConsumerHandle::Writer::~Writer() { |
| 82 Close(); | 146 Close(); |
| 83 } | 147 } |
| 84 | 148 |
| 85 void SharedMemoryDataConsumerHandle::Writer::AddData( | 149 void SharedMemoryDataConsumerHandle::Writer::AddData( |
| 86 scoped_ptr<RequestPeer::ReceivedData> data) { | 150 scoped_ptr<RequestPeer::ReceivedData> data) { |
| 87 if (!data->length()) { | 151 if (!data->length()) { |
| 88 // We omit empty data. | 152 // We omit empty data. |
| 89 return; | 153 return; |
| 90 } | 154 } |
| 91 | 155 |
| 92 if (!context_->is_reader_active()) { | 156 bool needs_notification = false; |
| 93 // No one is interested in the data. | 157 { |
| 94 return; | 158 base::AutoLock lock(context_->lock()); |
| 159 if (!context_->is_reader_active()) { |
| 160 // No one is interested in the data. |
| 161 return; |
| 162 } |
| 163 |
| 164 needs_notification = context_->client() && context_->IsEmpty(); |
| 165 scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass; |
| 166 if (mode_ == kApplyBackpressure) { |
| 167 data_to_pass = |
| 168 make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass())); |
| 169 } else { |
| 170 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get())); |
| 171 } |
| 172 context_->Push(data_to_pass.Pass()); |
| 95 } | 173 } |
| 96 | 174 |
| 97 bool needs_notification = context_->client() && context_->IsEmpty(); | |
| 98 scoped_ptr<RequestPeer::ReceivedData> data_to_pass; | |
| 99 if (mode_ == kApplyBackpressure) { | |
| 100 data_to_pass = data.Pass(); | |
| 101 } else { | |
| 102 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get())); | |
| 103 } | |
| 104 context_->Push(data_to_pass.Pass()); | |
| 105 | |
| 106 if (needs_notification) | 175 if (needs_notification) |
| 107 context_->client()->didGetReadable(); | 176 context_->Notify(); |
| 108 } | 177 } |
| 109 | 178 |
| 110 void SharedMemoryDataConsumerHandle::Writer::Close() { | 179 void SharedMemoryDataConsumerHandle::Writer::Close() { |
| 111 if (context_->result() == Ok) { | 180 bool needs_notification = false; |
| 112 context_->set_result(Done); | 181 |
| 113 if (context_->client() && context_->IsEmpty()) | 182 { |
| 114 context_->client()->didGetReadable(); | 183 base::AutoLock lock(context_->lock()); |
| 184 if (context_->result() == Ok) { |
| 185 context_->set_result(Done); |
| 186 needs_notification = context_->client() && context_->IsEmpty(); |
| 187 } |
| 115 } | 188 } |
| 189 if (needs_notification) |
| 190 context_->Notify(); |
| 116 } | 191 } |
| 117 | 192 |
| 118 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( | 193 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( |
| 119 BackpressureMode mode, | 194 BackpressureMode mode, |
| 120 scoped_ptr<Writer>* writer) | 195 scoped_ptr<Writer>* writer) |
| 121 : context_(new Context) { | 196 : context_(new Context) { |
| 122 writer->reset(new Writer(context_, mode)); | 197 writer->reset(new Writer(context_, mode)); |
| 123 } | 198 } |
| 124 | 199 |
| 125 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { | 200 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { |
| 201 base::AutoLock lock(context_->lock()); |
| 126 context_->set_is_reader_active(false); | 202 context_->set_is_reader_active(false); |
| 127 context_->Clear(); | 203 context_->Clear(); |
| 128 } | 204 } |
| 129 | 205 |
| 130 Result SharedMemoryDataConsumerHandle::read(void* data, | 206 Result SharedMemoryDataConsumerHandle::read(void* data, |
| 131 size_t size, | 207 size_t size, |
| 132 Flags flags, | 208 Flags flags, |
| 133 size_t* read_size_to_return) { | 209 size_t* read_size_to_return) { |
| 210 base::AutoLock lock(context_->lock()); |
| 211 |
| 134 size_t total_read_size = 0; | 212 size_t total_read_size = 0; |
| 135 *read_size_to_return = 0; | 213 *read_size_to_return = 0; |
| 136 if (context_->result() != Ok && context_->result() != Done) | 214 if (context_->result() != Ok && context_->result() != Done) |
| 137 return context_->result(); | 215 return context_->result(); |
| 138 | 216 |
| 139 while (!context_->IsEmpty() && total_read_size < size) { | 217 while (!context_->IsEmpty() && total_read_size < size) { |
| 140 const auto& top = context_->Top(); | 218 const auto& top = context_->Top(); |
| 141 size_t readable = top->length() - context_->first_offset(); | 219 size_t readable = top->length() - context_->first_offset(); |
| 142 size_t writable = size - total_read_size; | 220 size_t writable = size - total_read_size; |
| 143 size_t read_size = std::min(readable, writable); | 221 size_t read_size = std::min(readable, writable); |
| 144 const char* begin = top->payload() + context_->first_offset(); | 222 const char* begin = top->payload() + context_->first_offset(); |
| 145 std::copy(begin, begin + read_size, | 223 std::copy(begin, begin + read_size, |
| 146 static_cast<char*>(data) + total_read_size); | 224 static_cast<char*>(data) + total_read_size); |
| 147 total_read_size += read_size; | 225 total_read_size += read_size; |
| 148 context_->Consume(read_size); | 226 context_->Consume(read_size); |
| 149 } | 227 } |
| 150 *read_size_to_return = total_read_size; | 228 *read_size_to_return = total_read_size; |
| 151 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait; | 229 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait; |
| 152 } | 230 } |
| 153 | 231 |
| 154 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer, | 232 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer, |
| 155 Flags flags, | 233 Flags flags, |
| 156 size_t* available) { | 234 size_t* available) { |
| 157 *buffer = nullptr; | 235 *buffer = nullptr; |
| 158 *available = 0; | 236 *available = 0; |
| 159 | 237 |
| 238 base::AutoLock lock(context_->lock()); |
| 239 |
| 160 if (context_->result() != Ok && context_->result() != Done) | 240 if (context_->result() != Ok && context_->result() != Done) |
| 161 return context_->result(); | 241 return context_->result(); |
| 162 | 242 |
| 163 if (context_->IsEmpty()) | 243 if (context_->IsEmpty()) |
| 164 return context_->result() == Done ? Done : ShouldWait; | 244 return context_->result() == Done ? Done : ShouldWait; |
| 165 | 245 |
| 166 const auto& top = context_->Top(); | 246 const auto& top = context_->Top(); |
| 167 *buffer = top->payload() + context_->first_offset(); | 247 *buffer = top->payload() + context_->first_offset(); |
| 168 *available = top->length() - context_->first_offset(); | 248 *available = top->length() - context_->first_offset(); |
| 169 | 249 |
| 170 return Ok; | 250 return Ok; |
| 171 } | 251 } |
| 172 | 252 |
| 173 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) { | 253 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) { |
| 254 base::AutoLock lock(context_->lock()); |
| 255 |
| 174 if (context_->IsEmpty()) | 256 if (context_->IsEmpty()) |
| 175 return UnexpectedError; | 257 return UnexpectedError; |
| 176 | 258 |
| 177 context_->Consume(read_size); | 259 context_->Consume(read_size); |
| 178 return Ok; | 260 return Ok; |
| 179 } | 261 } |
| 180 | 262 |
| 181 void SharedMemoryDataConsumerHandle::registerClient(Client* client) { | 263 void SharedMemoryDataConsumerHandle::registerClient(Client* client) { |
| 182 context_->set_client(client); | 264 bool needs_notification = false; |
| 265 { |
| 266 base::AutoLock lock(context_->lock()); |
| 183 | 267 |
| 184 if (!context_->IsEmpty()) | 268 context_->SetClient(client); |
| 185 client->didGetReadable(); | 269 needs_notification = !context_->IsEmpty(); |
| 270 } |
| 271 if (needs_notification) |
| 272 context_->Notify(); |
| 186 } | 273 } |
| 187 | 274 |
| 188 void SharedMemoryDataConsumerHandle::unregisterClient() { | 275 void SharedMemoryDataConsumerHandle::unregisterClient() { |
| 189 context_->set_client(nullptr); | 276 base::AutoLock lock(context_->lock()); |
| 277 |
| 278 context_->SetClient(nullptr); |
| 190 } | 279 } |
| 191 | 280 |
| 192 } // namespace content | 281 } // namespace content |
| OLD | NEW |