| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/mojo/ipc_message_pipe_reader.h" | 5 #include "ipc/mojo/ipc_message_pipe_reader.h" |
| 6 | 6 |
| 7 #include "base/bind.h" | 7 #include "base/bind.h" |
| 8 #include "base/bind_helpers.h" | 8 #include "base/bind_helpers.h" |
| 9 #include "base/location.h" | 9 #include "base/location.h" |
| 10 #include "base/logging.h" | 10 #include "base/logging.h" |
| 11 #include "base/single_thread_task_runner.h" | 11 #include "base/single_thread_task_runner.h" |
| 12 #include "base/thread_task_runner_handle.h" | 12 #include "base/thread_task_runner_handle.h" |
| 13 #include "ipc/mojo/async_handle_waiter.h" | 13 #include "ipc/mojo/async_handle_waiter.h" |
| 14 #include "ipc/mojo/ipc_channel_mojo.h" | 14 #include "ipc/mojo/ipc_channel_mojo.h" |
| 15 | 15 |
| 16 namespace IPC { | 16 namespace IPC { |
| 17 namespace internal { | 17 namespace internal { |
| 18 | 18 |
| 19 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle, | 19 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle, |
| 20 MessagePipeReader::Delegate* delegate) | 20 MessagePipeReader::Delegate* delegate) |
| 21 : pipe_(handle.Pass()), | 21 : pipe_(handle.Pass()), |
| 22 handle_copy_(pipe_.get().value()), |
| 22 delegate_(delegate), | 23 delegate_(delegate), |
| 23 async_waiter_( | 24 async_waiter_( |
| 24 new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady, | 25 new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady, |
| 25 base::Unretained(this)))), | 26 base::Unretained(this)))), |
| 26 pending_send_error_(MOJO_RESULT_OK) { | 27 pending_send_error_(MOJO_RESULT_OK) { |
| 27 } | 28 } |
| 28 | 29 |
| 29 MessagePipeReader::~MessagePipeReader() { | 30 MessagePipeReader::~MessagePipeReader() { |
| 31 DCHECK(thread_checker_.CalledOnValidThread()); |
| 30 // The pipe should be closed before deletion. | 32 // The pipe should be closed before deletion. |
| 31 CHECK(!IsValid()); | 33 CHECK(!IsValid()); |
| 32 DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK); | |
| 33 } | 34 } |
| 34 | 35 |
| 35 void MessagePipeReader::Close() { | 36 void MessagePipeReader::Close() { |
| 36 // All pending errors should be signaled before Close(). | 37 DCHECK(thread_checker_.CalledOnValidThread()); |
| 37 DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK); | |
| 38 async_waiter_.reset(); | 38 async_waiter_.reset(); |
| 39 pipe_.reset(); | 39 pipe_.reset(); |
| 40 OnPipeClosed(); | 40 OnPipeClosed(); |
| 41 } | 41 } |
| 42 | 42 |
| 43 void MessagePipeReader::CloseWithError(MojoResult error) { | 43 void MessagePipeReader::CloseWithError(MojoResult error) { |
| 44 DCHECK(thread_checker_.CalledOnValidThread()); |
| 44 OnPipeError(error); | 45 OnPipeError(error); |
| 45 Close(); | 46 Close(); |
| 46 } | 47 } |
| 47 | 48 |
| 48 void MessagePipeReader::CloseWithErrorIfPending() { | 49 void MessagePipeReader::CloseWithErrorIfPending() { |
| 49 if (pending_send_error_ == MOJO_RESULT_OK) | 50 DCHECK(thread_checker_.CalledOnValidThread()); |
| 51 MojoResult pending_error = base::subtle::NoBarrier_Load(&pending_send_error_); |
| 52 if (pending_error == MOJO_RESULT_OK) |
| 50 return; | 53 return; |
| 51 MojoResult error = pending_send_error_; | 54 // NOTE: This races with Send(), and therefore the value of |
| 52 pending_send_error_ = MOJO_RESULT_OK; | 55 // pending_send_error() can change. |
| 53 CloseWithError(error); | 56 CloseWithError(pending_error); |
| 54 return; | 57 return; |
| 55 } | 58 } |
| 56 | 59 |
| 57 void MessagePipeReader::CloseWithErrorLater(MojoResult error) { | 60 void MessagePipeReader::CloseWithErrorLater(MojoResult error) { |
| 58 pending_send_error_ = error; | 61 DCHECK_NE(error, MOJO_RESULT_OK); |
| 62 // NOTE: No assumptions about the value of |pending_send_error_| or whether or |
| 63 // not the error has been signaled can be made. If Send() is called |
| 64 // immediately before Close() and errors, it's possible for the error to not |
| 65 // be signaled. |
| 66 base::subtle::NoBarrier_Store(&pending_send_error_, error); |
| 59 } | 67 } |
| 60 | 68 |
| 61 bool MessagePipeReader::Send(scoped_ptr<Message> message) { | 69 bool MessagePipeReader::Send(scoped_ptr<Message> message) { |
| 62 DCHECK(IsValid()); | |
| 63 | |
| 64 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 70 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| 65 "MessagePipeReader::Send", | 71 "MessagePipeReader::Send", |
| 66 message->flags(), | 72 message->flags(), |
| 67 TRACE_EVENT_FLAG_FLOW_OUT); | 73 TRACE_EVENT_FLAG_FLOW_OUT); |
| 68 std::vector<MojoHandle> handles; | 74 std::vector<MojoHandle> handles; |
| 69 MojoResult result = MOJO_RESULT_OK; | 75 MojoResult result = MOJO_RESULT_OK; |
| 70 result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles); | 76 result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles); |
| 71 if (result == MOJO_RESULT_OK) { | 77 if (result == MOJO_RESULT_OK) { |
| 72 result = MojoWriteMessage(handle(), | 78 result = MojoWriteMessage(handle(), |
| 73 message->data(), | 79 message->data(), |
| (...skipping 30 matching lines...) Expand all Loading... |
| 104 } | 110 } |
| 105 | 111 |
| 106 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 112 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| 107 "MessagePipeReader::OnMessageReceived", | 113 "MessagePipeReader::OnMessageReceived", |
| 108 message.flags(), | 114 message.flags(), |
| 109 TRACE_EVENT_FLAG_FLOW_IN); | 115 TRACE_EVENT_FLAG_FLOW_IN); |
| 110 delegate_->OnMessageReceived(message); | 116 delegate_->OnMessageReceived(message); |
| 111 } | 117 } |
| 112 | 118 |
| 113 void MessagePipeReader::OnPipeClosed() { | 119 void MessagePipeReader::OnPipeClosed() { |
| 120 DCHECK(thread_checker_.CalledOnValidThread()); |
| 114 if (!delegate_) | 121 if (!delegate_) |
| 115 return; | 122 return; |
| 116 delegate_->OnPipeClosed(this); | 123 delegate_->OnPipeClosed(this); |
| 117 delegate_ = nullptr; | 124 delegate_ = nullptr; |
| 118 } | 125 } |
| 119 | 126 |
| 120 void MessagePipeReader::OnPipeError(MojoResult error) { | 127 void MessagePipeReader::OnPipeError(MojoResult error) { |
| 128 DCHECK(thread_checker_.CalledOnValidThread()); |
| 121 if (!delegate_) | 129 if (!delegate_) |
| 122 return; | 130 return; |
| 123 delegate_->OnPipeError(this); | 131 delegate_->OnPipeError(this); |
| 124 } | 132 } |
| 125 | 133 |
| 126 MojoResult MessagePipeReader::ReadMessageBytes() { | 134 MojoResult MessagePipeReader::ReadMessageBytes() { |
| 135 DCHECK(thread_checker_.CalledOnValidThread()); |
| 127 DCHECK(handle_buffer_.empty()); | 136 DCHECK(handle_buffer_.empty()); |
| 128 | 137 |
| 129 uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size()); | 138 uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size()); |
| 130 uint32_t num_handles = 0; | 139 uint32_t num_handles = 0; |
| 131 MojoResult result = MojoReadMessage(pipe_.get().value(), | 140 MojoResult result = MojoReadMessage(pipe_.get().value(), |
| 132 num_bytes ? &data_buffer_[0] : nullptr, | 141 num_bytes ? &data_buffer_[0] : nullptr, |
| 133 &num_bytes, | 142 &num_bytes, |
| 134 nullptr, | 143 nullptr, |
| 135 &num_handles, | 144 &num_handles, |
| 136 MOJO_READ_MESSAGE_FLAG_NONE); | 145 MOJO_READ_MESSAGE_FLAG_NONE); |
| 137 data_buffer_.resize(num_bytes); | 146 data_buffer_.resize(num_bytes); |
| 138 handle_buffer_.resize(num_handles); | 147 handle_buffer_.resize(num_handles); |
| 139 if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) { | 148 if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) { |
| 140 // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that | 149 // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that |
| 141 // it needs more bufer. So we re-read it with resized buffers. | 150 // it needs more bufer. So we re-read it with resized buffers. |
| 142 result = MojoReadMessage(pipe_.get().value(), | 151 result = MojoReadMessage(pipe_.get().value(), |
| 143 num_bytes ? &data_buffer_[0] : nullptr, | 152 num_bytes ? &data_buffer_[0] : nullptr, |
| 144 &num_bytes, | 153 &num_bytes, |
| 145 num_handles ? &handle_buffer_[0] : nullptr, | 154 num_handles ? &handle_buffer_[0] : nullptr, |
| 146 &num_handles, | 155 &num_handles, |
| 147 MOJO_READ_MESSAGE_FLAG_NONE); | 156 MOJO_READ_MESSAGE_FLAG_NONE); |
| 148 } | 157 } |
| 149 | 158 |
| 150 DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes); | 159 DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes); |
| 151 DCHECK(0 == num_handles || handle_buffer_.size() == num_handles); | 160 DCHECK(0 == num_handles || handle_buffer_.size() == num_handles); |
| 152 return result; | 161 return result; |
| 153 } | 162 } |
| 154 | 163 |
| 155 void MessagePipeReader::ReadAvailableMessages() { | 164 void MessagePipeReader::ReadAvailableMessages() { |
| 165 DCHECK(thread_checker_.CalledOnValidThread()); |
| 156 while (pipe_.is_valid()) { | 166 while (pipe_.is_valid()) { |
| 157 MojoResult read_result = ReadMessageBytes(); | 167 MojoResult read_result = ReadMessageBytes(); |
| 158 if (read_result == MOJO_RESULT_SHOULD_WAIT) | 168 if (read_result == MOJO_RESULT_SHOULD_WAIT) |
| 159 break; | 169 break; |
| 160 if (read_result != MOJO_RESULT_OK) { | 170 if (read_result != MOJO_RESULT_OK) { |
| 161 DLOG(WARNING) | 171 DLOG(WARNING) |
| 162 << "Pipe got error from ReadMessage(). Closing: " << read_result; | 172 << "Pipe got error from ReadMessage(). Closing: " << read_result; |
| 163 OnPipeError(read_result); | 173 OnPipeError(read_result); |
| 164 Close(); | 174 Close(); |
| 165 break; | 175 break; |
| 166 } | 176 } |
| 167 | 177 |
| 168 OnMessageReceived(); | 178 OnMessageReceived(); |
| 169 } | 179 } |
| 170 | 180 |
| 171 } | 181 } |
| 172 | 182 |
| 173 void MessagePipeReader::ReadMessagesThenWait() { | 183 void MessagePipeReader::ReadMessagesThenWait() { |
| 184 DCHECK(thread_checker_.CalledOnValidThread()); |
| 174 while (true) { | 185 while (true) { |
| 175 ReadAvailableMessages(); | 186 ReadAvailableMessages(); |
| 176 if (!pipe_.is_valid()) | 187 if (!pipe_.is_valid()) |
| 177 break; | 188 break; |
| 178 // |Wait()| is safe to call only after all messages are read. | 189 // |Wait()| is safe to call only after all messages are read. |
| 179 // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise. | 190 // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise. |
| 180 // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in | 191 // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in |
| 181 // MessagePipe. | 192 // MessagePipe. |
| 182 MojoResult result = | 193 MojoResult result = |
| 183 async_waiter_->Wait(pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE); | 194 async_waiter_->Wait(pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE); |
| 184 // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages | 195 // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages |
| 185 // that have been arrived after the last |ReadAvailableMessages()|. | 196 // that have been arrived after the last |ReadAvailableMessages()|. |
| 186 // We have to consume then and retry in that case. | 197 // We have to consume then and retry in that case. |
| 187 if (result != MOJO_RESULT_ALREADY_EXISTS) { | 198 if (result != MOJO_RESULT_ALREADY_EXISTS) { |
| 188 if (result != MOJO_RESULT_OK) { | 199 if (result != MOJO_RESULT_OK) { |
| 189 LOG(ERROR) << "Failed to wait on the pipe. Result is " << result; | 200 LOG(ERROR) << "Failed to wait on the pipe. Result is " << result; |
| 190 OnPipeError(result); | 201 OnPipeError(result); |
| 191 Close(); | 202 Close(); |
| 192 } | 203 } |
| 193 | 204 |
| 194 break; | 205 break; |
| 195 } | 206 } |
| 196 } | 207 } |
| 197 } | 208 } |
| 198 | 209 |
| 199 void MessagePipeReader::PipeIsReady(MojoResult wait_result) { | 210 void MessagePipeReader::PipeIsReady(MojoResult wait_result) { |
| 211 DCHECK(thread_checker_.CalledOnValidThread()); |
| 200 CloseWithErrorIfPending(); | 212 CloseWithErrorIfPending(); |
| 201 if (!IsValid()) { | 213 if (!IsValid()) { |
| 202 // There was a pending error and it closed the pipe. | 214 // There was a pending error and it closed the pipe. |
| 203 // We cannot do the work anymore. | 215 // We cannot do the work anymore. |
| 204 return; | 216 return; |
| 205 } | 217 } |
| 206 | 218 |
| 207 if (wait_result != MOJO_RESULT_OK) { | 219 if (wait_result != MOJO_RESULT_OK) { |
| 208 if (wait_result != MOJO_RESULT_ABORTED) { | 220 if (wait_result != MOJO_RESULT_ABORTED) { |
| 209 // FAILED_PRECONDITION happens every time the peer is dead so | 221 // FAILED_PRECONDITION happens every time the peer is dead so |
| (...skipping 12 matching lines...) Expand all Loading... |
| 222 | 234 |
| 223 void MessagePipeReader::DelayedDeleter::operator()( | 235 void MessagePipeReader::DelayedDeleter::operator()( |
| 224 MessagePipeReader* ptr) const { | 236 MessagePipeReader* ptr) const { |
| 225 ptr->Close(); | 237 ptr->Close(); |
| 226 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, | 238 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, |
| 227 base::Bind(&DeleteNow, ptr)); | 239 base::Bind(&DeleteNow, ptr)); |
| 228 } | 240 } |
| 229 | 241 |
| 230 } // namespace internal | 242 } // namespace internal |
| 231 } // namespace IPC | 243 } // namespace IPC |
| OLD | NEW |