| Index: content/child/shared_memory_data_consumer_handle.cc
|
| diff --git a/content/child/shared_memory_data_consumer_handle.cc b/content/child/shared_memory_data_consumer_handle.cc
|
| index 71804dda2fcc0e22734e13601fc7e34c21758eb5..beb470d96d31e91d05214d1b13b4e1909ae242cc 100644
|
| --- a/content/child/shared_memory_data_consumer_handle.cc
|
| +++ b/content/child/shared_memory_data_consumer_handle.cc
|
| @@ -53,7 +53,8 @@ class SharedMemoryDataConsumerHandle::Context final
|
| : result_(Ok),
|
| first_offset_(0),
|
| client_(nullptr),
|
| - is_handle_active_(true) {}
|
| + is_handle_active_(true),
|
| + is_two_phase_read_in_progress_(false) {}
|
|
|
| bool IsEmpty() const { return queue_.empty(); }
|
| void ClearIfNecessary() {
|
| @@ -62,6 +63,13 @@ class SharedMemoryDataConsumerHandle::Context final
|
| Clear();
|
| }
|
| }
|
| + void ClearQueue() {
|
| + for (auto& data : queue_) {
|
| + delete data;
|
| + }
|
| + queue_.clear();
|
| + first_offset_ = 0;
|
| + }
|
| RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); }
|
| void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) {
|
| queue_.push_back(data.release());
|
| @@ -86,6 +94,16 @@ class SharedMemoryDataConsumerHandle::Context final
|
| notification_task_runner_ = nullptr;
|
| client_ = nullptr;
|
| }
|
| + void PostNotify() {
|
| + auto runner = notification_task_runner_;
|
| + if (!runner)
|
| + return;
|
| + // We don't re-post the task when the runner changes while waiting for
|
| + // this task because in this case a new reader is obtained and
|
| + // notification is already done at the reader creation time if necessary.
|
| + runner->PostTask(FROM_HERE,
|
| + base::Bind(&Context::NotifyInternal, this, false));
|
| + }
|
| void Notify() { NotifyInternal(true); }
|
| bool is_handle_locked() const { return notification_task_runner_; }
|
| bool IsReaderBoundToCurrentThread() const {
|
| @@ -103,6 +121,12 @@ class SharedMemoryDataConsumerHandle::Context final
|
| first_offset_ = 0;
|
| }
|
| }
|
| + bool is_two_phase_read_in_progress() const {
|
| + return is_two_phase_read_in_progress_;
|
| + }
|
| + void set_is_two_phase_read_in_progress(bool b) {
|
| + is_two_phase_read_in_progress_ = b;
|
| + }
|
| base::Lock& lock() { return lock_; }
|
|
|
| private:
|
| @@ -154,6 +178,7 @@ class SharedMemoryDataConsumerHandle::Context final
|
| Client* client_;
|
| scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_;
|
| bool is_handle_active_;
|
| + bool is_two_phase_read_in_progress_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(Context);
|
| };
|
| @@ -194,8 +219,12 @@ void SharedMemoryDataConsumerHandle::Writer::AddData(
|
| context_->Push(data_to_pass.Pass());
|
| }
|
|
|
| - if (needs_notification)
|
| + if (needs_notification) {
|
| + // We CAN issue the notification synchronously if the associated reader
|
| + // lives in this thread, because this function cannot be called in the
|
| + // client's callback.
|
| context_->Notify();
|
| + }
|
| }
|
|
|
| void SharedMemoryDataConsumerHandle::Writer::Close() {
|
| @@ -208,8 +237,37 @@ void SharedMemoryDataConsumerHandle::Writer::Close() {
|
| needs_notification = context_->IsEmpty();
|
| }
|
| }
|
| - if (needs_notification)
|
| - context_->Notify();
|
| + if (needs_notification) {
|
| + // We cannot issue the notification synchronously because this function can
|
| + // be called in the client's callback.
|
| + context_->PostNotify();
|
| + }
|
| +}
|
| +
|
| +void SharedMemoryDataConsumerHandle::Writer::Fail() {
|
| + bool needs_notification = false;
|
| + {
|
| + base::AutoLock lock(context_->lock());
|
| + if (context_->result() == Ok) {
|
| + // TODO(yhirano): Use an appropriate error code other than
|
| + // UnexpectedError.
|
| + context_->set_result(UnexpectedError);
|
| +
|
| + if (context_->is_two_phase_read_in_progress()) {
|
| + // If we are in two-phase read session, we cannot discard the data. We
|
| + // will clear the queue at the end of the session.
|
| + } else {
|
| + context_->ClearQueue();
|
| + }
|
| +
|
| + needs_notification = true;
|
| + }
|
| + }
|
| + if (needs_notification) {
|
| + // We cannot issue the notification synchronously because this function can
|
| + // be called in the client's callback.
|
| + context_->PostNotify();
|
| + }
|
| }
|
|
|
| SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl(
|
| @@ -236,6 +294,10 @@ Result SharedMemoryDataConsumerHandle::ReaderImpl::read(
|
|
|
| size_t total_read_size = 0;
|
| *read_size_to_return = 0;
|
| +
|
| + if (context_->result() == Ok && context_->is_two_phase_read_in_progress())
|
| + context_->set_result(UnexpectedError);
|
| +
|
| if (context_->result() != Ok && context_->result() != Done)
|
| return context_->result();
|
|
|
| @@ -263,12 +325,16 @@ Result SharedMemoryDataConsumerHandle::ReaderImpl::beginRead(
|
|
|
| base::AutoLock lock(context_->lock());
|
|
|
| + if (context_->result() == Ok && context_->is_two_phase_read_in_progress())
|
| + context_->set_result(UnexpectedError);
|
| +
|
| if (context_->result() != Ok && context_->result() != Done)
|
| return context_->result();
|
|
|
| if (context_->IsEmpty())
|
| return context_->result() == Done ? Done : ShouldWait;
|
|
|
| + context_->set_is_two_phase_read_in_progress(true);
|
| const auto& top = context_->Top();
|
| *buffer = top->payload() + context_->first_offset();
|
| *available = top->length() - context_->first_offset();
|
| @@ -279,10 +345,17 @@ Result SharedMemoryDataConsumerHandle::ReaderImpl::beginRead(
|
| Result SharedMemoryDataConsumerHandle::ReaderImpl::endRead(size_t read_size) {
|
| base::AutoLock lock(context_->lock());
|
|
|
| - if (context_->IsEmpty())
|
| + if (!context_->is_two_phase_read_in_progress())
|
| return UnexpectedError;
|
|
|
| - context_->Consume(read_size);
|
| + context_->set_is_two_phase_read_in_progress(false);
|
| + if (context_->result() != Ok && context_->result() != Done) {
|
| + // We have an error, so we can discard the stored data.
|
| + context_->ClearQueue();
|
| + } else {
|
| + context_->Consume(read_size);
|
| + }
|
| +
|
| return Ok;
|
| }
|
|
|
| @@ -309,62 +382,6 @@ SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) {
|
| return new ReaderImpl(context_, client);
|
| }
|
|
|
| -Result SharedMemoryDataConsumerHandle::read(void* data,
|
| - size_t size,
|
| - Flags flags,
|
| - size_t* read_size_to_return) {
|
| - // Note this (and below similar functions) is a bit racy. We don't care about
|
| - // it because this is a deprecated function and will be removed shortly.
|
| - LockImplicitly();
|
| - return reader_->read(data, size, flags, read_size_to_return);
|
| -}
|
| -
|
| -Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
|
| - Flags flags,
|
| - size_t* available) {
|
| - LockImplicitly();
|
| - return reader_->beginRead(buffer, flags, available);
|
| -}
|
| -
|
| -Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
|
| - LockImplicitly();
|
| - return reader_->endRead(read_size);
|
| -}
|
| -
|
| -void SharedMemoryDataConsumerHandle::registerClient(Client* client) {
|
| - UnlockImplicitly();
|
| - reader_ = ObtainReader(client);
|
| -}
|
| -
|
| -void SharedMemoryDataConsumerHandle::unregisterClient() {
|
| - reader_.reset();
|
| -}
|
| -
|
| -void SharedMemoryDataConsumerHandle::LockImplicitly() {
|
| - {
|
| - base::AutoLock lock(context_->lock());
|
| - if (reader_) {
|
| - DCHECK(context_->IsReaderBoundToCurrentThread());
|
| - return;
|
| - }
|
| - }
|
| - reader_ = ObtainReader(nullptr);
|
| -}
|
| -
|
| -void SharedMemoryDataConsumerHandle::UnlockImplicitly() {
|
| - bool needs_unlock = false;
|
| - {
|
| - base::AutoLock lock(context_->lock());
|
| - if (reader_) {
|
| - DCHECK(context_->IsReaderBoundToCurrentThread());
|
| - needs_unlock = true;
|
| - }
|
| - }
|
| - if (needs_unlock) {
|
| - reader_.reset();
|
| - }
|
| -}
|
| -
|
| const char* SharedMemoryDataConsumerHandle::debugName() const {
|
| return "SharedMemoryDataConsumerHandle";
|
| }
|
|
|