OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/mojo/ipc_message_pipe_reader.h" | 5 #include "ipc/mojo/ipc_message_pipe_reader.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/bind_helpers.h" | 8 #include "base/bind_helpers.h" |
9 #include "base/location.h" | 9 #include "base/location.h" |
10 #include "base/logging.h" | 10 #include "base/logging.h" |
11 #include "base/single_thread_task_runner.h" | 11 #include "base/single_thread_task_runner.h" |
12 #include "base/thread_task_runner_handle.h" | 12 #include "base/thread_task_runner_handle.h" |
13 #include "ipc/mojo/async_handle_waiter.h" | 13 #include "ipc/mojo/async_handle_waiter.h" |
14 #include "ipc/mojo/ipc_channel_mojo.h" | 14 #include "ipc/mojo/ipc_channel_mojo.h" |
15 | 15 |
16 namespace IPC { | 16 namespace IPC { |
17 namespace internal { | 17 namespace internal { |
18 | 18 |
19 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle, | 19 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle, |
20 MessagePipeReader::Delegate* delegate) | 20 MessagePipeReader::Delegate* delegate) |
21 : pipe_(handle.Pass()), | 21 : pipe_(handle.Pass()), |
| 22 handle_copy_(pipe_.get().value()), |
22 delegate_(delegate), | 23 delegate_(delegate), |
23 async_waiter_( | 24 async_waiter_( |
24 new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady, | 25 new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady, |
25 base::Unretained(this)))), | 26 base::Unretained(this)))), |
26 pending_send_error_(MOJO_RESULT_OK) { | 27 pending_send_error_(MOJO_RESULT_OK) { |
27 } | 28 } |
28 | 29 |
29 MessagePipeReader::~MessagePipeReader() { | 30 MessagePipeReader::~MessagePipeReader() { |
| 31 DCHECK(thread_checker_.CalledOnValidThread()); |
30 // The pipe should be closed before deletion. | 32 // The pipe should be closed before deletion. |
31 CHECK(!IsValid()); | 33 CHECK(!IsValid()); |
32 DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK); | |
33 } | 34 } |
34 | 35 |
35 void MessagePipeReader::Close() { | 36 void MessagePipeReader::Close() { |
36 // All pending errors should be signaled before Close(). | 37 DCHECK(thread_checker_.CalledOnValidThread()); |
37 DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK); | |
38 async_waiter_.reset(); | 38 async_waiter_.reset(); |
39 pipe_.reset(); | 39 pipe_.reset(); |
40 OnPipeClosed(); | 40 OnPipeClosed(); |
41 } | 41 } |
42 | 42 |
43 void MessagePipeReader::CloseWithError(MojoResult error) { | 43 void MessagePipeReader::CloseWithError(MojoResult error) { |
| 44 DCHECK(thread_checker_.CalledOnValidThread()); |
44 OnPipeError(error); | 45 OnPipeError(error); |
45 Close(); | 46 Close(); |
46 } | 47 } |
47 | 48 |
48 void MessagePipeReader::CloseWithErrorIfPending() { | 49 void MessagePipeReader::CloseWithErrorIfPending() { |
49 if (pending_send_error_ == MOJO_RESULT_OK) | 50 DCHECK(thread_checker_.CalledOnValidThread()); |
| 51 MojoResult pending_error = base::subtle::NoBarrier_Load(&pending_send_error_); |
| 52 if (pending_error == MOJO_RESULT_OK) |
50 return; | 53 return; |
51 MojoResult error = pending_send_error_; | 54 // NOTE: This races with Send(), and therefore the value of |
52 pending_send_error_ = MOJO_RESULT_OK; | 55 // pending_send_error() can change. |
53 CloseWithError(error); | 56 CloseWithError(pending_error); |
54 return; | 57 return; |
55 } | 58 } |
56 | 59 |
57 void MessagePipeReader::CloseWithErrorLater(MojoResult error) { | 60 void MessagePipeReader::CloseWithErrorLater(MojoResult error) { |
58 pending_send_error_ = error; | 61 DCHECK_NE(error, MOJO_RESULT_OK); |
| 62 // NOTE: No assumptions about the value of |pending_send_error_| or whether or |
| 63 // not the error has been signaled can be made. If Send() is called |
| 64 // immediately before Close() and errors, it's possible for the error to not |
| 65 // be signaled. |
| 66 base::subtle::NoBarrier_Store(&pending_send_error_, error); |
59 } | 67 } |
60 | 68 |
61 bool MessagePipeReader::Send(scoped_ptr<Message> message) { | 69 bool MessagePipeReader::Send(scoped_ptr<Message> message) { |
62 DCHECK(IsValid()); | |
63 | |
64 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 70 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
65 "MessagePipeReader::Send", | 71 "MessagePipeReader::Send", |
66 message->flags(), | 72 message->flags(), |
67 TRACE_EVENT_FLAG_FLOW_OUT); | 73 TRACE_EVENT_FLAG_FLOW_OUT); |
68 std::vector<MojoHandle> handles; | 74 std::vector<MojoHandle> handles; |
69 MojoResult result = MOJO_RESULT_OK; | 75 MojoResult result = MOJO_RESULT_OK; |
70 result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles); | 76 result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles); |
71 if (result == MOJO_RESULT_OK) { | 77 if (result == MOJO_RESULT_OK) { |
72 result = MojoWriteMessage(handle(), | 78 result = MojoWriteMessage(handle(), |
73 message->data(), | 79 message->data(), |
(...skipping 30 matching lines...) Expand all Loading... |
104 } | 110 } |
105 | 111 |
106 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 112 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
107 "MessagePipeReader::OnMessageReceived", | 113 "MessagePipeReader::OnMessageReceived", |
108 message.flags(), | 114 message.flags(), |
109 TRACE_EVENT_FLAG_FLOW_IN); | 115 TRACE_EVENT_FLAG_FLOW_IN); |
110 delegate_->OnMessageReceived(message); | 116 delegate_->OnMessageReceived(message); |
111 } | 117 } |
112 | 118 |
113 void MessagePipeReader::OnPipeClosed() { | 119 void MessagePipeReader::OnPipeClosed() { |
| 120 DCHECK(thread_checker_.CalledOnValidThread()); |
114 if (!delegate_) | 121 if (!delegate_) |
115 return; | 122 return; |
116 delegate_->OnPipeClosed(this); | 123 delegate_->OnPipeClosed(this); |
117 delegate_ = nullptr; | 124 delegate_ = nullptr; |
118 } | 125 } |
119 | 126 |
120 void MessagePipeReader::OnPipeError(MojoResult error) { | 127 void MessagePipeReader::OnPipeError(MojoResult error) { |
| 128 DCHECK(thread_checker_.CalledOnValidThread()); |
121 if (!delegate_) | 129 if (!delegate_) |
122 return; | 130 return; |
123 delegate_->OnPipeError(this); | 131 delegate_->OnPipeError(this); |
124 } | 132 } |
125 | 133 |
126 MojoResult MessagePipeReader::ReadMessageBytes() { | 134 MojoResult MessagePipeReader::ReadMessageBytes() { |
| 135 DCHECK(thread_checker_.CalledOnValidThread()); |
127 DCHECK(handle_buffer_.empty()); | 136 DCHECK(handle_buffer_.empty()); |
128 | 137 |
129 uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size()); | 138 uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size()); |
130 uint32_t num_handles = 0; | 139 uint32_t num_handles = 0; |
131 MojoResult result = MojoReadMessage(pipe_.get().value(), | 140 MojoResult result = MojoReadMessage(pipe_.get().value(), |
132 num_bytes ? &data_buffer_[0] : nullptr, | 141 num_bytes ? &data_buffer_[0] : nullptr, |
133 &num_bytes, | 142 &num_bytes, |
134 nullptr, | 143 nullptr, |
135 &num_handles, | 144 &num_handles, |
136 MOJO_READ_MESSAGE_FLAG_NONE); | 145 MOJO_READ_MESSAGE_FLAG_NONE); |
137 data_buffer_.resize(num_bytes); | 146 data_buffer_.resize(num_bytes); |
138 handle_buffer_.resize(num_handles); | 147 handle_buffer_.resize(num_handles); |
139 if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) { | 148 if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) { |
140 // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that | 149 // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that |
141 // it needs more bufer. So we re-read it with resized buffers. | 150 // it needs more bufer. So we re-read it with resized buffers. |
142 result = MojoReadMessage(pipe_.get().value(), | 151 result = MojoReadMessage(pipe_.get().value(), |
143 num_bytes ? &data_buffer_[0] : nullptr, | 152 num_bytes ? &data_buffer_[0] : nullptr, |
144 &num_bytes, | 153 &num_bytes, |
145 num_handles ? &handle_buffer_[0] : nullptr, | 154 num_handles ? &handle_buffer_[0] : nullptr, |
146 &num_handles, | 155 &num_handles, |
147 MOJO_READ_MESSAGE_FLAG_NONE); | 156 MOJO_READ_MESSAGE_FLAG_NONE); |
148 } | 157 } |
149 | 158 |
150 DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes); | 159 DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes); |
151 DCHECK(0 == num_handles || handle_buffer_.size() == num_handles); | 160 DCHECK(0 == num_handles || handle_buffer_.size() == num_handles); |
152 return result; | 161 return result; |
153 } | 162 } |
154 | 163 |
155 void MessagePipeReader::ReadAvailableMessages() { | 164 void MessagePipeReader::ReadAvailableMessages() { |
| 165 DCHECK(thread_checker_.CalledOnValidThread()); |
156 while (pipe_.is_valid()) { | 166 while (pipe_.is_valid()) { |
157 MojoResult read_result = ReadMessageBytes(); | 167 MojoResult read_result = ReadMessageBytes(); |
158 if (read_result == MOJO_RESULT_SHOULD_WAIT) | 168 if (read_result == MOJO_RESULT_SHOULD_WAIT) |
159 break; | 169 break; |
160 if (read_result != MOJO_RESULT_OK) { | 170 if (read_result != MOJO_RESULT_OK) { |
161 DLOG(WARNING) | 171 DLOG(WARNING) |
162 << "Pipe got error from ReadMessage(). Closing: " << read_result; | 172 << "Pipe got error from ReadMessage(). Closing: " << read_result; |
163 OnPipeError(read_result); | 173 OnPipeError(read_result); |
164 Close(); | 174 Close(); |
165 break; | 175 break; |
166 } | 176 } |
167 | 177 |
168 OnMessageReceived(); | 178 OnMessageReceived(); |
169 } | 179 } |
170 | 180 |
171 } | 181 } |
172 | 182 |
173 void MessagePipeReader::ReadMessagesThenWait() { | 183 void MessagePipeReader::ReadMessagesThenWait() { |
| 184 DCHECK(thread_checker_.CalledOnValidThread()); |
174 while (true) { | 185 while (true) { |
175 ReadAvailableMessages(); | 186 ReadAvailableMessages(); |
176 if (!pipe_.is_valid()) | 187 if (!pipe_.is_valid()) |
177 break; | 188 break; |
178 // |Wait()| is safe to call only after all messages are read. | 189 // |Wait()| is safe to call only after all messages are read. |
179 // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise. | 190 // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise. |
180 // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in | 191 // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in |
181 // MessagePipe. | 192 // MessagePipe. |
182 MojoResult result = | 193 MojoResult result = |
183 async_waiter_->Wait(pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE); | 194 async_waiter_->Wait(pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE); |
184 // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages | 195 // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages |
185 // that have been arrived after the last |ReadAvailableMessages()|. | 196 // that have been arrived after the last |ReadAvailableMessages()|. |
186 // We have to consume then and retry in that case. | 197 // We have to consume then and retry in that case. |
187 if (result != MOJO_RESULT_ALREADY_EXISTS) { | 198 if (result != MOJO_RESULT_ALREADY_EXISTS) { |
188 if (result != MOJO_RESULT_OK) { | 199 if (result != MOJO_RESULT_OK) { |
189 LOG(ERROR) << "Failed to wait on the pipe. Result is " << result; | 200 LOG(ERROR) << "Failed to wait on the pipe. Result is " << result; |
190 OnPipeError(result); | 201 OnPipeError(result); |
191 Close(); | 202 Close(); |
192 } | 203 } |
193 | 204 |
194 break; | 205 break; |
195 } | 206 } |
196 } | 207 } |
197 } | 208 } |
198 | 209 |
199 void MessagePipeReader::PipeIsReady(MojoResult wait_result) { | 210 void MessagePipeReader::PipeIsReady(MojoResult wait_result) { |
| 211 DCHECK(thread_checker_.CalledOnValidThread()); |
200 CloseWithErrorIfPending(); | 212 CloseWithErrorIfPending(); |
201 if (!IsValid()) { | 213 if (!IsValid()) { |
202 // There was a pending error and it closed the pipe. | 214 // There was a pending error and it closed the pipe. |
203 // We cannot do the work anymore. | 215 // We cannot do the work anymore. |
204 return; | 216 return; |
205 } | 217 } |
206 | 218 |
207 if (wait_result != MOJO_RESULT_OK) { | 219 if (wait_result != MOJO_RESULT_OK) { |
208 if (wait_result != MOJO_RESULT_ABORTED) { | 220 if (wait_result != MOJO_RESULT_ABORTED) { |
209 // FAILED_PRECONDITION happens every time the peer is dead so | 221 // FAILED_PRECONDITION happens every time the peer is dead so |
(...skipping 12 matching lines...) Expand all Loading... |
222 | 234 |
223 void MessagePipeReader::DelayedDeleter::operator()( | 235 void MessagePipeReader::DelayedDeleter::operator()( |
224 MessagePipeReader* ptr) const { | 236 MessagePipeReader* ptr) const { |
225 ptr->Close(); | 237 ptr->Close(); |
226 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, | 238 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, |
227 base::Bind(&DeleteNow, ptr)); | 239 base::Bind(&DeleteNow, ptr)); |
228 } | 240 } |
229 | 241 |
230 } // namespace internal | 242 } // namespace internal |
231 } // namespace IPC | 243 } // namespace IPC |
OLD | NEW |