Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(362)

Side by Side Diff: mojo/public/cpp/bindings/lib/connector.cc

Issue 1455063004: Mojo C++ bindings: introduce MultiplexRouter and related classes. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: rebase Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/connector.h" 5 #include "mojo/public/cpp/bindings/lib/connector.h"
6 6
7 #include "mojo/public/cpp/environment/logging.h" 7 #include "base/logging.h"
8 #include "base/macros.h"
9 #include "base/synchronization/lock.h"
8 10
9 namespace mojo { 11 namespace mojo {
10 namespace internal { 12 namespace internal {
11 13
14 namespace {
15
16 // Similar to base::AutoLock, except that it does nothing if |lock| passed into
17 // the constructor is null.
18 class MayAutoLock {
19 public:
20 explicit MayAutoLock(base::Lock* lock) : lock_(lock) {
21 if (lock_)
22 lock_->Acquire();
23 }
24
25 ~MayAutoLock() {
26 if (lock_) {
27 lock_->AssertAcquired();
28 lock_->Release();
29 }
30 }
31
32 private:
33 base::Lock* lock_;
34 DISALLOW_COPY_AND_ASSIGN(MayAutoLock);
35 };
36
37 } // namespace
38
12 // ---------------------------------------------------------------------------- 39 // ----------------------------------------------------------------------------
13 40
14 Connector::Connector(ScopedMessagePipeHandle message_pipe, 41 Connector::Connector(ScopedMessagePipeHandle message_pipe,
42 ConnectorConfig config,
15 const MojoAsyncWaiter* waiter) 43 const MojoAsyncWaiter* waiter)
16 : waiter_(waiter), 44 : waiter_(waiter),
17 message_pipe_(message_pipe.Pass()), 45 message_pipe_(message_pipe.Pass()),
18 incoming_receiver_(nullptr), 46 incoming_receiver_(nullptr),
19 async_wait_id_(0), 47 async_wait_id_(0),
20 error_(false), 48 error_(false),
21 drop_writes_(false), 49 drop_writes_(false),
22 enforce_errors_from_incoming_receiver_(true), 50 enforce_errors_from_incoming_receiver_(true),
23 paused_(false), 51 paused_(false),
24 destroyed_flag_(nullptr) { 52 destroyed_flag_(nullptr),
53 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr) {
25 // Even though we don't have an incoming receiver, we still want to monitor 54 // Even though we don't have an incoming receiver, we still want to monitor
26 // the message pipe to know if is closed or encounters an error. 55 // the message pipe to know if is closed or encounters an error.
27 WaitToReadMore(); 56 WaitToReadMore();
28 } 57 }
29 58
30 Connector::~Connector() { 59 Connector::~Connector() {
60 DCHECK(thread_checker_.CalledOnValidThread());
61
31 if (destroyed_flag_) 62 if (destroyed_flag_)
32 *destroyed_flag_ = true; 63 *destroyed_flag_ = true;
33 64
34 CancelWait(); 65 CancelWait();
35 } 66 }
36 67
37 void Connector::CloseMessagePipe() { 68 void Connector::CloseMessagePipe() {
69 DCHECK(thread_checker_.CalledOnValidThread());
70
38 CancelWait(); 71 CancelWait();
72 MayAutoLock locker(lock_.get());
39 Close(message_pipe_.Pass()); 73 Close(message_pipe_.Pass());
40 } 74 }
41 75
42 ScopedMessagePipeHandle Connector::PassMessagePipe() { 76 ScopedMessagePipeHandle Connector::PassMessagePipe() {
77 DCHECK(thread_checker_.CalledOnValidThread());
78
43 CancelWait(); 79 CancelWait();
80 MayAutoLock locker(lock_.get());
44 return message_pipe_.Pass(); 81 return message_pipe_.Pass();
45 } 82 }
46 83
47 void Connector::RaiseError() { 84 void Connector::RaiseError() {
85 DCHECK(thread_checker_.CalledOnValidThread());
86
48 HandleError(true, true); 87 HandleError(true, true);
49 } 88 }
50 89
51 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) { 90 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) {
91 DCHECK(thread_checker_.CalledOnValidThread());
92
52 if (error_) 93 if (error_)
53 return false; 94 return false;
54 95
55 ResumeIncomingMethodCallProcessing(); 96 ResumeIncomingMethodCallProcessing();
56 97
57 MojoResult rv = 98 MojoResult rv =
58 Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr); 99 Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr);
59 if (rv == MOJO_RESULT_SHOULD_WAIT || rv == MOJO_RESULT_DEADLINE_EXCEEDED) 100 if (rv == MOJO_RESULT_SHOULD_WAIT || rv == MOJO_RESULT_DEADLINE_EXCEEDED)
60 return false; 101 return false;
61 if (rv != MOJO_RESULT_OK) { 102 if (rv != MOJO_RESULT_OK) {
62 // Users that call WaitForIncomingMessage() should expect their code to be 103 // Users that call WaitForIncomingMessage() should expect their code to be
63 // re-entered, so we call the error handler synchronously. 104 // re-entered, so we call the error handler synchronously.
64 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); 105 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
65 return false; 106 return false;
66 } 107 }
67 mojo_ignore_result(ReadSingleMessage(&rv)); 108 mojo_ignore_result(ReadSingleMessage(&rv));
68 return (rv == MOJO_RESULT_OK); 109 return (rv == MOJO_RESULT_OK);
69 } 110 }
70 111
71 void Connector::PauseIncomingMethodCallProcessing() { 112 void Connector::PauseIncomingMethodCallProcessing() {
113 DCHECK(thread_checker_.CalledOnValidThread());
114
72 if (paused_) 115 if (paused_)
73 return; 116 return;
74 117
75 paused_ = true; 118 paused_ = true;
76 CancelWait(); 119 CancelWait();
77 } 120 }
78 121
79 void Connector::ResumeIncomingMethodCallProcessing() { 122 void Connector::ResumeIncomingMethodCallProcessing() {
123 DCHECK(thread_checker_.CalledOnValidThread());
124
80 if (!paused_) 125 if (!paused_)
81 return; 126 return;
82 127
83 paused_ = false; 128 paused_ = false;
84 WaitToReadMore(); 129 WaitToReadMore();
85 } 130 }
86 131
87 bool Connector::Accept(Message* message) { 132 bool Connector::Accept(Message* message) {
133 DCHECK(lock_ || thread_checker_.CalledOnValidThread());
134
135 // It shouldn't hurt even if |error_| may be changed by a different thread at
136 // the same time. The outcome is that we may write into |message_pipe_| after
137 // encountering an error, which should be fine.
88 if (error_) 138 if (error_)
89 return false; 139 return false;
90 140
91 MOJO_CHECK(message_pipe_.is_valid()); 141 MayAutoLock locker(lock_.get());
92 if (drop_writes_) 142
143 if (!message_pipe_.is_valid() || drop_writes_)
93 return true; 144 return true;
94 145
95 MojoResult rv = 146 MojoResult rv =
96 WriteMessageRaw(message_pipe_.get(), 147 WriteMessageRaw(message_pipe_.get(),
97 message->data(), 148 message->data(),
98 message->data_num_bytes(), 149 message->data_num_bytes(),
99 message->mutable_handles()->empty() 150 message->mutable_handles()->empty()
100 ? nullptr 151 ? nullptr
101 : reinterpret_cast<const MojoHandle*>( 152 : reinterpret_cast<const MojoHandle*>(
102 &message->mutable_handles()->front()), 153 &message->mutable_handles()->front()),
(...skipping 14 matching lines...) Expand all
117 drop_writes_ = true; 168 drop_writes_ = true;
118 break; 169 break;
119 case MOJO_RESULT_BUSY: 170 case MOJO_RESULT_BUSY:
120 // We'd get a "busy" result if one of the message's handles is: 171 // We'd get a "busy" result if one of the message's handles is:
121 // - |message_pipe_|'s own handle; 172 // - |message_pipe_|'s own handle;
122 // - simultaneously being used on another thread; or 173 // - simultaneously being used on another thread; or
123 // - in a "busy" state that prohibits it from being transferred (e.g., 174 // - in a "busy" state that prohibits it from being transferred (e.g.,
124 // a data pipe handle in the middle of a two-phase read/write, 175 // a data pipe handle in the middle of a two-phase read/write,
125 // regardless of which thread that two-phase read/write is happening 176 // regardless of which thread that two-phase read/write is happening
126 // on). 177 // on).
127 // TODO(vtl): I wonder if this should be a |MOJO_DCHECK()|. (But, until 178 // TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until
128 // crbug.com/389666, etc. are resolved, this will make tests fail quickly 179 // crbug.com/389666, etc. are resolved, this will make tests fail quickly
129 // rather than hanging.) 180 // rather than hanging.)
130 MOJO_CHECK(false) << "Race condition or other bug detected"; 181 CHECK(false) << "Race condition or other bug detected";
131 return false; 182 return false;
132 default: 183 default:
133 // This particular write was rejected, presumably because of bad input. 184 // This particular write was rejected, presumably because of bad input.
134 // The pipe is not necessarily in a bad state. 185 // The pipe is not necessarily in a bad state.
135 return false; 186 return false;
136 } 187 }
137 return true; 188 return true;
138 } 189 }
139 190
140 // static 191 // static
141 void Connector::CallOnHandleReady(void* closure, MojoResult result) { 192 void Connector::CallOnHandleReady(void* closure, MojoResult result) {
142 Connector* self = static_cast<Connector*>(closure); 193 Connector* self = static_cast<Connector*>(closure);
143 self->OnHandleReady(result); 194 self->OnHandleReady(result);
144 } 195 }
145 196
146 void Connector::OnHandleReady(MojoResult result) { 197 void Connector::OnHandleReady(MojoResult result) {
147 MOJO_CHECK(async_wait_id_ != 0); 198 DCHECK(thread_checker_.CalledOnValidThread());
199
200 CHECK(async_wait_id_ != 0);
148 async_wait_id_ = 0; 201 async_wait_id_ = 0;
149 if (result != MOJO_RESULT_OK) { 202 if (result != MOJO_RESULT_OK) {
150 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); 203 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
151 return; 204 return;
152 } 205 }
153 ReadAllAvailableMessages(); 206 ReadAllAvailableMessages();
154 // At this point, this object might have been deleted. Return. 207 // At this point, this object might have been deleted. Return.
155 } 208 }
156 209
157 void Connector::WaitToReadMore() { 210 void Connector::WaitToReadMore() {
158 MOJO_CHECK(!async_wait_id_); 211 CHECK(!async_wait_id_);
159 async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(), 212 async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(),
160 MOJO_HANDLE_SIGNAL_READABLE, 213 MOJO_HANDLE_SIGNAL_READABLE,
161 MOJO_DEADLINE_INDEFINITE, 214 MOJO_DEADLINE_INDEFINITE,
162 &Connector::CallOnHandleReady, 215 &Connector::CallOnHandleReady,
163 this); 216 this);
164 } 217 }
165 218
166 bool Connector::ReadSingleMessage(MojoResult* read_result) { 219 bool Connector::ReadSingleMessage(MojoResult* read_result) {
167 bool receiver_result = false; 220 bool receiver_result = false;
168 221
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
230 force_pipe_reset = true; 283 force_pipe_reset = true;
231 284
232 if (paused_) { 285 if (paused_) {
233 // If the user has paused receiving messages, we shouldn't call the error 286 // If the user has paused receiving messages, we shouldn't call the error
234 // handler right away. We need to wait until the user starts receiving 287 // handler right away. We need to wait until the user starts receiving
235 // messages again. 288 // messages again.
236 force_async_handler = true; 289 force_async_handler = true;
237 } 290 }
238 291
239 if (force_pipe_reset) { 292 if (force_pipe_reset) {
240 CloseMessagePipe(); 293 CancelWait();
294 MayAutoLock locker(lock_.get());
295 Close(message_pipe_.Pass());
241 MessagePipe dummy_pipe; 296 MessagePipe dummy_pipe;
242 message_pipe_ = dummy_pipe.handle0.Pass(); 297 message_pipe_ = dummy_pipe.handle0.Pass();
243 } else { 298 } else {
244 CancelWait(); 299 CancelWait();
245 } 300 }
246 301
247 if (force_async_handler) { 302 if (force_async_handler) {
248 // |dummy_pipe.handle1| has been destructed. Reading the pipe will 303 // |dummy_pipe.handle1| has been destructed. Reading the pipe will
249 // eventually cause a read error on |message_pipe_| and set error state. 304 // eventually cause a read error on |message_pipe_| and set error state.
250 if (!paused_) 305 if (!paused_)
251 WaitToReadMore(); 306 WaitToReadMore();
252 } else { 307 } else {
253 error_ = true; 308 error_ = true;
254 connection_error_handler_.Run(); 309 connection_error_handler_.Run();
255 } 310 }
256 } 311 }
257 312
258 } // namespace internal 313 } // namespace internal
259 } // namespace mojo 314 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/connector.h ('k') | mojo/public/cpp/bindings/lib/interface_endpoint_client.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698