Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(173)

Side by Side Diff: ipc/mojo/ipc_message_pipe_reader.cc

Issue 1318453002: Fix races with MessagePipeReader due to the Mojo IPC channel being thread-safe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Add bug reference. Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/mojo/ipc_message_pipe_reader.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/mojo/ipc_message_pipe_reader.h" 5 #include "ipc/mojo/ipc_message_pipe_reader.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/bind_helpers.h" 8 #include "base/bind_helpers.h"
9 #include "base/location.h" 9 #include "base/location.h"
10 #include "base/logging.h" 10 #include "base/logging.h"
11 #include "base/single_thread_task_runner.h" 11 #include "base/single_thread_task_runner.h"
12 #include "base/thread_task_runner_handle.h" 12 #include "base/thread_task_runner_handle.h"
13 #include "ipc/mojo/async_handle_waiter.h" 13 #include "ipc/mojo/async_handle_waiter.h"
14 #include "ipc/mojo/ipc_channel_mojo.h" 14 #include "ipc/mojo/ipc_channel_mojo.h"
15 15
16 namespace IPC { 16 namespace IPC {
17 namespace internal { 17 namespace internal {
18 18
19 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle, 19 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle,
20 MessagePipeReader::Delegate* delegate) 20 MessagePipeReader::Delegate* delegate)
21 : pipe_(handle.Pass()), 21 : pipe_(handle.Pass()),
22 handle_copy_(pipe_.get().value()),
22 delegate_(delegate), 23 delegate_(delegate),
23 async_waiter_( 24 async_waiter_(
24 new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady, 25 new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady,
25 base::Unretained(this)))), 26 base::Unretained(this)))),
26 pending_send_error_(MOJO_RESULT_OK) { 27 pending_send_error_(MOJO_RESULT_OK) {
27 } 28 }
28 29
29 MessagePipeReader::~MessagePipeReader() { 30 MessagePipeReader::~MessagePipeReader() {
31 DCHECK(thread_checker_.CalledOnValidThread());
30 // The pipe should be closed before deletion. 32 // The pipe should be closed before deletion.
31 CHECK(!IsValid()); 33 CHECK(!IsValid());
32 DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK);
33 } 34 }
34 35
35 void MessagePipeReader::Close() { 36 void MessagePipeReader::Close() {
36 // All pending errors should be signaled before Close(). 37 DCHECK(thread_checker_.CalledOnValidThread());
37 DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK);
38 async_waiter_.reset(); 38 async_waiter_.reset();
39 pipe_.reset(); 39 pipe_.reset();
40 OnPipeClosed(); 40 OnPipeClosed();
41 } 41 }
42 42
43 void MessagePipeReader::CloseWithError(MojoResult error) { 43 void MessagePipeReader::CloseWithError(MojoResult error) {
44 DCHECK(thread_checker_.CalledOnValidThread());
44 OnPipeError(error); 45 OnPipeError(error);
45 Close(); 46 Close();
46 } 47 }
47 48
48 void MessagePipeReader::CloseWithErrorIfPending() { 49 void MessagePipeReader::CloseWithErrorIfPending() {
49 if (pending_send_error_ == MOJO_RESULT_OK) 50 DCHECK(thread_checker_.CalledOnValidThread());
51 MojoResult pending_error = base::subtle::NoBarrier_Load(&pending_send_error_);
52 if (pending_error == MOJO_RESULT_OK)
50 return; 53 return;
51 MojoResult error = pending_send_error_; 54 // NOTE: This races with Send(), and therefore the value of
52 pending_send_error_ = MOJO_RESULT_OK; 55 // pending_send_error() can change.
53 CloseWithError(error); 56 CloseWithError(pending_error);
54 return; 57 return;
55 } 58 }
56 59
57 void MessagePipeReader::CloseWithErrorLater(MojoResult error) { 60 void MessagePipeReader::CloseWithErrorLater(MojoResult error) {
58 pending_send_error_ = error; 61 DCHECK_NE(error, MOJO_RESULT_OK);
62 // NOTE: No assumptions about the value of |pending_send_error_| or whether or
63 // not the error has been signaled can be made. If Send() is called
64 // immediately before Close() and errors, it's possible for the error to not
65 // be signaled.
66 base::subtle::NoBarrier_Store(&pending_send_error_, error);
59 } 67 }
60 68
61 bool MessagePipeReader::Send(scoped_ptr<Message> message) { 69 bool MessagePipeReader::Send(scoped_ptr<Message> message) {
62 DCHECK(IsValid());
63
64 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 70 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
65 "MessagePipeReader::Send", 71 "MessagePipeReader::Send",
66 message->flags(), 72 message->flags(),
67 TRACE_EVENT_FLAG_FLOW_OUT); 73 TRACE_EVENT_FLAG_FLOW_OUT);
68 std::vector<MojoHandle> handles; 74 std::vector<MojoHandle> handles;
69 MojoResult result = MOJO_RESULT_OK; 75 MojoResult result = MOJO_RESULT_OK;
70 result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles); 76 result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles);
71 if (result == MOJO_RESULT_OK) { 77 if (result == MOJO_RESULT_OK) {
72 result = MojoWriteMessage(handle(), 78 result = MojoWriteMessage(handle(),
73 message->data(), 79 message->data(),
(...skipping 30 matching lines...) Expand all
104 } 110 }
105 111
106 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 112 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
107 "MessagePipeReader::OnMessageReceived", 113 "MessagePipeReader::OnMessageReceived",
108 message.flags(), 114 message.flags(),
109 TRACE_EVENT_FLAG_FLOW_IN); 115 TRACE_EVENT_FLAG_FLOW_IN);
110 delegate_->OnMessageReceived(message); 116 delegate_->OnMessageReceived(message);
111 } 117 }
112 118
113 void MessagePipeReader::OnPipeClosed() { 119 void MessagePipeReader::OnPipeClosed() {
120 DCHECK(thread_checker_.CalledOnValidThread());
114 if (!delegate_) 121 if (!delegate_)
115 return; 122 return;
116 delegate_->OnPipeClosed(this); 123 delegate_->OnPipeClosed(this);
117 delegate_ = nullptr; 124 delegate_ = nullptr;
118 } 125 }
119 126
120 void MessagePipeReader::OnPipeError(MojoResult error) { 127 void MessagePipeReader::OnPipeError(MojoResult error) {
128 DCHECK(thread_checker_.CalledOnValidThread());
121 if (!delegate_) 129 if (!delegate_)
122 return; 130 return;
123 delegate_->OnPipeError(this); 131 delegate_->OnPipeError(this);
124 } 132 }
125 133
126 MojoResult MessagePipeReader::ReadMessageBytes() { 134 MojoResult MessagePipeReader::ReadMessageBytes() {
135 DCHECK(thread_checker_.CalledOnValidThread());
127 DCHECK(handle_buffer_.empty()); 136 DCHECK(handle_buffer_.empty());
128 137
129 uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size()); 138 uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
130 uint32_t num_handles = 0; 139 uint32_t num_handles = 0;
131 MojoResult result = MojoReadMessage(pipe_.get().value(), 140 MojoResult result = MojoReadMessage(pipe_.get().value(),
132 num_bytes ? &data_buffer_[0] : nullptr, 141 num_bytes ? &data_buffer_[0] : nullptr,
133 &num_bytes, 142 &num_bytes,
134 nullptr, 143 nullptr,
135 &num_handles, 144 &num_handles,
136 MOJO_READ_MESSAGE_FLAG_NONE); 145 MOJO_READ_MESSAGE_FLAG_NONE);
137 data_buffer_.resize(num_bytes); 146 data_buffer_.resize(num_bytes);
138 handle_buffer_.resize(num_handles); 147 handle_buffer_.resize(num_handles);
139 if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) { 148 if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
140 // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that 149 // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
141 // it needs more bufer. So we re-read it with resized buffers. 150 // it needs more bufer. So we re-read it with resized buffers.
142 result = MojoReadMessage(pipe_.get().value(), 151 result = MojoReadMessage(pipe_.get().value(),
143 num_bytes ? &data_buffer_[0] : nullptr, 152 num_bytes ? &data_buffer_[0] : nullptr,
144 &num_bytes, 153 &num_bytes,
145 num_handles ? &handle_buffer_[0] : nullptr, 154 num_handles ? &handle_buffer_[0] : nullptr,
146 &num_handles, 155 &num_handles,
147 MOJO_READ_MESSAGE_FLAG_NONE); 156 MOJO_READ_MESSAGE_FLAG_NONE);
148 } 157 }
149 158
150 DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes); 159 DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
151 DCHECK(0 == num_handles || handle_buffer_.size() == num_handles); 160 DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
152 return result; 161 return result;
153 } 162 }
154 163
155 void MessagePipeReader::ReadAvailableMessages() { 164 void MessagePipeReader::ReadAvailableMessages() {
165 DCHECK(thread_checker_.CalledOnValidThread());
156 while (pipe_.is_valid()) { 166 while (pipe_.is_valid()) {
157 MojoResult read_result = ReadMessageBytes(); 167 MojoResult read_result = ReadMessageBytes();
158 if (read_result == MOJO_RESULT_SHOULD_WAIT) 168 if (read_result == MOJO_RESULT_SHOULD_WAIT)
159 break; 169 break;
160 if (read_result != MOJO_RESULT_OK) { 170 if (read_result != MOJO_RESULT_OK) {
161 DLOG(WARNING) 171 DLOG(WARNING)
162 << "Pipe got error from ReadMessage(). Closing: " << read_result; 172 << "Pipe got error from ReadMessage(). Closing: " << read_result;
163 OnPipeError(read_result); 173 OnPipeError(read_result);
164 Close(); 174 Close();
165 break; 175 break;
166 } 176 }
167 177
168 OnMessageReceived(); 178 OnMessageReceived();
169 } 179 }
170 180
171 } 181 }
172 182
173 void MessagePipeReader::ReadMessagesThenWait() { 183 void MessagePipeReader::ReadMessagesThenWait() {
184 DCHECK(thread_checker_.CalledOnValidThread());
174 while (true) { 185 while (true) {
175 ReadAvailableMessages(); 186 ReadAvailableMessages();
176 if (!pipe_.is_valid()) 187 if (!pipe_.is_valid())
177 break; 188 break;
178 // |Wait()| is safe to call only after all messages are read. 189 // |Wait()| is safe to call only after all messages are read.
179 // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise. 190 // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise.
180 // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in 191 // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
181 // MessagePipe. 192 // MessagePipe.
182 MojoResult result = 193 MojoResult result =
183 async_waiter_->Wait(pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE); 194 async_waiter_->Wait(pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE);
184 // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages 195 // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages
185 // that have been arrived after the last |ReadAvailableMessages()|. 196 // that have been arrived after the last |ReadAvailableMessages()|.
186 // We have to consume then and retry in that case. 197 // We have to consume then and retry in that case.
187 if (result != MOJO_RESULT_ALREADY_EXISTS) { 198 if (result != MOJO_RESULT_ALREADY_EXISTS) {
188 if (result != MOJO_RESULT_OK) { 199 if (result != MOJO_RESULT_OK) {
189 LOG(ERROR) << "Failed to wait on the pipe. Result is " << result; 200 LOG(ERROR) << "Failed to wait on the pipe. Result is " << result;
190 OnPipeError(result); 201 OnPipeError(result);
191 Close(); 202 Close();
192 } 203 }
193 204
194 break; 205 break;
195 } 206 }
196 } 207 }
197 } 208 }
198 209
199 void MessagePipeReader::PipeIsReady(MojoResult wait_result) { 210 void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
211 DCHECK(thread_checker_.CalledOnValidThread());
200 CloseWithErrorIfPending(); 212 CloseWithErrorIfPending();
201 if (!IsValid()) { 213 if (!IsValid()) {
202 // There was a pending error and it closed the pipe. 214 // There was a pending error and it closed the pipe.
203 // We cannot do the work anymore. 215 // We cannot do the work anymore.
204 return; 216 return;
205 } 217 }
206 218
207 if (wait_result != MOJO_RESULT_OK) { 219 if (wait_result != MOJO_RESULT_OK) {
208 if (wait_result != MOJO_RESULT_ABORTED) { 220 if (wait_result != MOJO_RESULT_ABORTED) {
209 // FAILED_PRECONDITION happens every time the peer is dead so 221 // FAILED_PRECONDITION happens every time the peer is dead so
(...skipping 12 matching lines...) Expand all
222 234
223 void MessagePipeReader::DelayedDeleter::operator()( 235 void MessagePipeReader::DelayedDeleter::operator()(
224 MessagePipeReader* ptr) const { 236 MessagePipeReader* ptr) const {
225 ptr->Close(); 237 ptr->Close();
226 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, 238 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE,
227 base::Bind(&DeleteNow, ptr)); 239 base::Bind(&DeleteNow, ptr));
228 } 240 }
229 241
230 } // namespace internal 242 } // namespace internal
231 } // namespace IPC 243 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/mojo/ipc_message_pipe_reader.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698