| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "content/child/shared_memory_data_consumer_handle.h" | 5 #include "content/child/shared_memory_data_consumer_handle.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <deque> | 8 #include <deque> |
| 9 #include <vector> | 9 #include <vector> |
| 10 | 10 |
| 11 #include "base/bind.h" |
| 12 #include "base/message_loop/message_loop.h" |
| 13 #include "base/single_thread_task_runner.h" |
| 14 #include "base/synchronization/lock.h" |
| 15 |
| 11 namespace content { | 16 namespace content { |
| 12 | 17 |
| 13 namespace { | 18 namespace { |
| 14 | 19 |
| 15 class FixedReceivedData final : public RequestPeer::ReceivedData { | 20 // A ThreadSafeReceivedData can be deleted on ANY thread. |
| 21 class ThreadSafeReceivedData : public RequestPeer::ReceivedData {}; |
| 22 |
| 23 class FixedReceivedData final : public ThreadSafeReceivedData { |
| 16 public: | 24 public: |
| 17 explicit FixedReceivedData(RequestPeer::ReceivedData* data) | 25 explicit FixedReceivedData(RequestPeer::ReceivedData* data) |
| 18 : data_(data->payload(), data->payload() + data->length()), | 26 : data_(data->payload(), data->payload() + data->length()), |
| 19 encoded_length_(data->encoded_length()) {} | 27 encoded_length_(data->encoded_length()) {} |
| 20 | 28 |
| 21 const char* payload() const override { | 29 const char* payload() const override { |
| 22 return data_.empty() ? nullptr : &data_[0]; | 30 return data_.empty() ? nullptr : &data_[0]; |
| 23 } | 31 } |
| 24 int length() const override { return static_cast<int>(data_.size()); } | 32 int length() const override { return static_cast<int>(data_.size()); } |
| 25 int encoded_length() const override { return encoded_length_; } | 33 int encoded_length() const override { return encoded_length_; } |
| 26 | 34 |
| 27 private: | 35 private: |
| 28 const std::vector<char> data_; | 36 const std::vector<char> data_; |
| 29 const int encoded_length_; | 37 const int encoded_length_; |
| 30 | 38 |
| 31 DISALLOW_COPY_AND_ASSIGN(FixedReceivedData); | 39 DISALLOW_COPY_AND_ASSIGN(FixedReceivedData); |
| 32 }; | 40 }; |
| 33 | 41 |
| 42 class DelegateThreadSafeReceivedData final : public ThreadSafeReceivedData { |
| 43 public: |
| 44 explicit DelegateThreadSafeReceivedData( |
| 45 scoped_ptr<RequestPeer::ReceivedData> data) |
| 46 : data_(data.Pass()), |
| 47 task_runner_(base::MessageLoop::current()->task_runner()) {} |
| 48 ~DelegateThreadSafeReceivedData() override { |
| 49 if (!task_runner_->BelongsToCurrentThread()) { |
| 50 // Delete the data on the original thread. |
| 51 task_runner_->DeleteSoon(FROM_HERE, data_.release()); |
| 52 } |
| 53 } |
| 54 |
| 55 const char* payload() const override { return data_->payload(); } |
| 56 int length() const override { return data_->length(); } |
| 57 int encoded_length() const override { return data_->encoded_length(); } |
| 58 |
| 59 private: |
| 60 scoped_ptr<RequestPeer::ReceivedData> data_; |
| 61 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| 62 |
| 63 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData); |
| 64 }; |
| 65 |
| 34 } // namespace | 66 } // namespace |
| 35 | 67 |
| 36 using Result = blink::WebDataConsumerHandle::Result; | 68 using Result = blink::WebDataConsumerHandle::Result; |
| 37 using Data = RequestPeer::ReceivedData; | |
| 38 | 69 |
| 39 class SharedMemoryDataConsumerHandle::Context final | 70 class SharedMemoryDataConsumerHandle::Context final |
| 40 : public base::RefCountedThreadSafe<Context> { | 71 : public base::RefCountedThreadSafe<Context> { |
| 41 public: | 72 public: |
| 42 Context() | 73 Context() |
| 43 : result_(Ok), | 74 : result_(Ok), |
| 44 first_offset_(0), | 75 first_offset_(0), |
| 45 client_(nullptr), | 76 client_(nullptr), |
| 46 is_reader_active_(true) {} | 77 is_reader_active_(true) {} |
| 47 | 78 |
| 48 bool IsEmpty() const { return queue_.empty(); } | 79 bool IsEmpty() const { return queue_.empty(); } |
| 49 void Clear() { | 80 void Clear() { |
| 50 for (Data* data : queue_) { | 81 for (auto& data : queue_) { |
| 51 delete data; | 82 delete data; |
| 52 } | 83 } |
| 53 queue_.clear(); | 84 queue_.clear(); |
| 54 first_offset_ = 0; | 85 first_offset_ = 0; |
| 55 client_ = nullptr; | 86 client_ = nullptr; |
| 56 } | 87 } |
| 57 void PopReadData() { | 88 void PopReadData() { |
| 58 Data* top = Top(); | 89 auto top = Top(); |
| 59 if (static_cast<size_t>(top->length()) <= first_offset_) { | 90 if (static_cast<size_t>(top->length()) <= first_offset_) { |
| 60 delete top; | 91 delete top; |
| 61 queue_.pop_front(); | 92 queue_.pop_front(); |
| 62 first_offset_ = 0; | 93 first_offset_ = 0; |
| 63 } | 94 } |
| 64 } | 95 } |
| 65 Data* Top() { return queue_.front(); } | 96 void Notify() { |
| 66 void Push(scoped_ptr<Data> data) { queue_.push_back(data.release()); } | 97 // Note that this function is not protected by |lock_| (actually it |
| 98 // shouldn't be) but |notification_task_runner_| is thread-safe. |
| 99 |
| 100 if (notification_task_runner_->BelongsToCurrentThread()) { |
| 101 NotifyImmediately(); |
| 102 } else { |
| 103 notification_task_runner_->PostTask( |
| 104 FROM_HERE, base::Bind(&Context::NotifyImmediately, this)); |
| 105 } |
| 106 } |
| 107 |
| 108 ThreadSafeReceivedData* Top() { return queue_.front(); } |
| 109 void Push(scoped_ptr<ThreadSafeReceivedData> data) { |
| 110 queue_.push_back(data.release()); |
| 111 } |
| 67 size_t first_offset() const { return first_offset_; } | 112 size_t first_offset() const { return first_offset_; } |
| 68 void AdvanceFirstOffset(size_t s) { first_offset_ += s; } | 113 void AdvanceFirstOffset(size_t s) { first_offset_ += s; } |
| 69 Result result() const { return result_; } | 114 Result result() const { return result_; } |
| 70 void set_result(Result r) { result_ = r; } | 115 void set_result(Result r) { result_ = r; } |
| 71 Client* client() { return client_; } | 116 Client* client() { return client_; } |
| 72 void set_client(Client* client) { client_ = client; } | 117 void SetClient(Client* client) { |
| 118 if (client) { |
| 119 notification_task_runner_ = base::MessageLoop::current()->task_runner(); |
| 120 client_ = client; |
| 121 } else { |
| 122 notification_task_runner_ = nullptr; |
| 123 client_ = nullptr; |
| 124 } |
| 125 } |
| 73 bool is_reader_active() const { return is_reader_active_; } | 126 bool is_reader_active() const { return is_reader_active_; } |
| 74 void set_is_reader_active(bool b) { is_reader_active_ = b; } | 127 void set_is_reader_active(bool b) { is_reader_active_ = b; } |
| 128 base::Lock& lock() { return lock_; } |
| 75 | 129 |
| 76 private: | 130 private: |
| 77 friend class base::RefCountedThreadSafe<Context>; | 131 friend class base::RefCountedThreadSafe<Context>; |
| 78 ~Context() { | 132 ~Context() { |
| 79 // This is necessary because the queue stores raw pointers. | 133 // This is necessary because the queue stores raw pointers. |
| 80 Clear(); | 134 Clear(); |
| 81 } | 135 } |
| 82 | 136 |
| 137 void NotifyImmediately() { |
| 138 // As we can assume that all reader-side methods are called on this |
| 139 // thread (see WebDataConsumerHandle comments), we don't need to lock. |
| 140 if (client_) |
| 141 client_->didGetReadable(); |
| 142 } |
| 143 |
| 144 base::Lock lock_; |
| 83 // |result_| stores the ultimate state of this handle if it has. Otherwise, | 145 // |result_| stores the ultimate state of this handle if it has. Otherwise, |
| 84 // |Ok| is set. | 146 // |Ok| is set. |
| 85 Result result_; | 147 Result result_; |
| 86 // TODO(yhirano): Use std::deque<scoped_ptr<ReceivedData>> once it is allowed. | 148 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>> |
| 87 std::deque<RequestPeer::ReceivedData*> queue_; | 149 // once it is allowed. |
| 150 std::deque<ThreadSafeReceivedData*> queue_; |
| 88 size_t first_offset_; | 151 size_t first_offset_; |
| 89 Client* client_; | 152 Client* client_; |
| 153 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_; |
| 90 bool is_reader_active_; | 154 bool is_reader_active_; |
| 91 | 155 |
| 92 DISALLOW_COPY_AND_ASSIGN(Context); | 156 DISALLOW_COPY_AND_ASSIGN(Context); |
| 93 }; | 157 }; |
| 94 | 158 |
| 95 SharedMemoryDataConsumerHandle::Writer::Writer( | 159 SharedMemoryDataConsumerHandle::Writer::Writer( |
| 96 const scoped_refptr<Context>& context, | 160 const scoped_refptr<Context>& context, |
| 97 BackpressureMode mode) | 161 BackpressureMode mode) |
| 98 : context_(context), mode_(mode) { | 162 : context_(context), mode_(mode) { |
| 99 } | 163 } |
| 100 | 164 |
| 101 SharedMemoryDataConsumerHandle::Writer::~Writer() { | 165 SharedMemoryDataConsumerHandle::Writer::~Writer() { |
| 102 Close(); | 166 Close(); |
| 103 } | 167 } |
| 104 | 168 |
| 105 void SharedMemoryDataConsumerHandle::Writer::AddData(scoped_ptr<Data> data) { | 169 void SharedMemoryDataConsumerHandle::Writer::AddData( |
| 170 scoped_ptr<RequestPeer::ReceivedData> data) { |
| 106 if (!data->length()) { | 171 if (!data->length()) { |
| 107 // We omit empty data. | 172 // We omit empty data. |
| 108 return; | 173 return; |
| 109 } | 174 } |
| 110 | 175 |
| 111 if (!context_->is_reader_active()) { | 176 bool needs_notification = false; |
| 112 // No one is interested in the data. | 177 { |
| 113 return; | 178 base::AutoLock lock(context_->lock()); |
| 179 if (!context_->is_reader_active()) { |
| 180 // No one is interested in the data. |
| 181 return; |
| 182 } |
| 183 |
| 184 needs_notification = context_->client() && context_->IsEmpty(); |
| 185 scoped_ptr<ThreadSafeReceivedData> data_to_pass; |
| 186 if (mode_ == kApplyBackpressure) { |
| 187 data_to_pass = |
| 188 make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass())); |
| 189 } else { |
| 190 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get())); |
| 191 } |
| 192 context_->Push(data_to_pass.Pass()); |
| 114 } | 193 } |
| 115 | 194 |
| 116 bool needs_notification = context_->client() && context_->IsEmpty(); | |
| 117 scoped_ptr<RequestPeer::ReceivedData> data_to_pass; | |
| 118 if (mode_ == kApplyBackpressure) { | |
| 119 data_to_pass = data.Pass(); | |
| 120 } else { | |
| 121 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get())); | |
| 122 } | |
| 123 context_->Push(data_to_pass.Pass()); | |
| 124 | |
| 125 if (needs_notification) | 195 if (needs_notification) |
| 126 context_->client()->didGetReadable(); | 196 context_->Notify(); |
| 127 } | 197 } |
| 128 | 198 |
| 129 void SharedMemoryDataConsumerHandle::Writer::Close() { | 199 void SharedMemoryDataConsumerHandle::Writer::Close() { |
| 130 if (context_->result() == Ok) { | 200 bool needs_notification = false; |
| 131 context_->set_result(Done); | 201 |
| 132 if (context_->client() && context_->IsEmpty()) | 202 { |
| 133 context_->client()->didGetReadable(); | 203 base::AutoLock lock(context_->lock()); |
| 204 if (context_->result() == Ok) { |
| 205 context_->set_result(Done); |
| 206 needs_notification = context_->client() && context_->IsEmpty(); |
| 207 } |
| 134 } | 208 } |
| 209 if (needs_notification) |
| 210 context_->Notify(); |
| 135 } | 211 } |
| 136 | 212 |
| 137 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( | 213 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( |
| 138 BackpressureMode mode, | 214 BackpressureMode mode, |
| 139 scoped_ptr<Writer>* writer) | 215 scoped_ptr<Writer>* writer) |
| 140 : context_(new Context) { | 216 : context_(new Context) { |
| 141 writer->reset(new Writer(context_, mode)); | 217 writer->reset(new Writer(context_, mode)); |
| 142 } | 218 } |
| 143 | 219 |
| 144 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { | 220 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { |
| 221 base::AutoLock lock(context_->lock()); |
| 145 context_->set_is_reader_active(false); | 222 context_->set_is_reader_active(false); |
| 146 context_->Clear(); | 223 context_->Clear(); |
| 147 } | 224 } |
| 148 | 225 |
| 149 Result SharedMemoryDataConsumerHandle::read(void* data, | 226 Result SharedMemoryDataConsumerHandle::read(void* data, |
| 150 size_t size, | 227 size_t size, |
| 151 Flags flags, | 228 Flags flags, |
| 152 size_t* read_size_to_return) { | 229 size_t* read_size_to_return) { |
| 230 base::AutoLock lock(context_->lock()); |
| 231 |
| 153 size_t total_read_size = 0; | 232 size_t total_read_size = 0; |
| 154 *read_size_to_return = 0; | 233 *read_size_to_return = 0; |
| 155 if (context_->result() != Ok && context_->result() != Done) | 234 if (context_->result() != Ok && context_->result() != Done) |
| 156 return context_->result(); | 235 return context_->result(); |
| 157 | 236 |
| 158 while (!context_->IsEmpty() && total_read_size < size) { | 237 while (!context_->IsEmpty() && total_read_size < size) { |
| 159 const auto& top = context_->Top(); | 238 const auto& top = context_->Top(); |
| 160 size_t readable = top->length() - context_->first_offset(); | 239 size_t readable = top->length() - context_->first_offset(); |
| 161 size_t writable = size - total_read_size; | 240 size_t writable = size - total_read_size; |
| 162 size_t read_size = std::min(readable, writable); | 241 size_t read_size = std::min(readable, writable); |
| 163 const char* begin = top->payload() + context_->first_offset(); | 242 const char* begin = top->payload() + context_->first_offset(); |
| 164 std::copy(begin, begin + read_size, | 243 std::copy(begin, begin + read_size, |
| 165 static_cast<char*>(data) + total_read_size); | 244 static_cast<char*>(data) + total_read_size); |
| 166 total_read_size += read_size; | 245 total_read_size += read_size; |
| 167 context_->AdvanceFirstOffset(read_size); | 246 context_->AdvanceFirstOffset(read_size); |
| 168 context_->PopReadData(); | 247 context_->PopReadData(); |
| 169 } | 248 } |
| 170 *read_size_to_return = total_read_size; | 249 *read_size_to_return = total_read_size; |
| 171 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait; | 250 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait; |
| 172 } | 251 } |
| 173 | 252 |
| 174 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer, | 253 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer, |
| 175 Flags flags, | 254 Flags flags, |
| 176 size_t* available) { | 255 size_t* available) { |
| 177 *buffer = nullptr; | 256 *buffer = nullptr; |
| 178 *available = 0; | 257 *available = 0; |
| 179 | 258 |
| 259 base::AutoLock lock(context_->lock()); |
| 260 |
| 180 if (context_->result() != Ok && context_->result() != Done) | 261 if (context_->result() != Ok && context_->result() != Done) |
| 181 return context_->result(); | 262 return context_->result(); |
| 182 | 263 |
| 183 if (context_->IsEmpty()) | 264 if (context_->IsEmpty()) |
| 184 return context_->result() == Done ? Done : ShouldWait; | 265 return context_->result() == Done ? Done : ShouldWait; |
| 185 | 266 |
| 186 const auto& top = context_->Top(); | 267 const auto& top = context_->Top(); |
| 187 *buffer = top->payload() + context_->first_offset(); | 268 *buffer = top->payload() + context_->first_offset(); |
| 188 *available = top->length() - context_->first_offset(); | 269 *available = top->length() - context_->first_offset(); |
| 189 | 270 |
| 190 return Ok; | 271 return Ok; |
| 191 } | 272 } |
| 192 | 273 |
| 193 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) { | 274 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) { |
| 275 base::AutoLock lock(context_->lock()); |
| 276 |
| 194 if (context_->IsEmpty()) | 277 if (context_->IsEmpty()) |
| 195 return UnexpectedError; | 278 return UnexpectedError; |
| 196 | 279 |
| 197 context_->AdvanceFirstOffset(read_size); | 280 context_->AdvanceFirstOffset(read_size); |
| 198 context_->PopReadData(); | 281 context_->PopReadData(); |
| 199 return Ok; | 282 return Ok; |
| 200 } | 283 } |
| 201 | 284 |
| 202 void SharedMemoryDataConsumerHandle::registerClient(Client* client) { | 285 void SharedMemoryDataConsumerHandle::registerClient(Client* client) { |
| 203 context_->set_client(client); | 286 bool needs_notification = false; |
| 287 { |
| 288 base::AutoLock lock(context_->lock()); |
| 204 | 289 |
| 205 if (!context_->IsEmpty()) | 290 context_->SetClient(client); |
| 206 client->didGetReadable(); | 291 needs_notification = !context_->IsEmpty(); |
| 292 } |
| 293 if (needs_notification) |
| 294 context_->Notify(); |
| 207 } | 295 } |
| 208 | 296 |
| 209 void SharedMemoryDataConsumerHandle::unregisterClient() { | 297 void SharedMemoryDataConsumerHandle::unregisterClient() { |
| 210 context_->set_client(nullptr); | 298 base::AutoLock lock(context_->lock()); |
| 299 |
| 300 context_->SetClient(nullptr); |
| 211 } | 301 } |
| 212 | 302 |
| 213 } // namespace content | 303 } // namespace content |
| OLD | NEW |