Index: mojo/public/cpp/bindings/lib/connector.cc |
diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc |
index 5bd94eb63ae94542a2d131affb8d97b6704e8e3b..af45bfc67f240a1b257c0b857c08550340879961 100644 |
--- a/mojo/public/cpp/bindings/lib/connector.cc |
+++ b/mojo/public/cpp/bindings/lib/connector.cc |
@@ -4,14 +4,42 @@ |
#include "mojo/public/cpp/bindings/lib/connector.h" |
-#include "mojo/public/cpp/environment/logging.h" |
+#include "base/logging.h" |
+#include "base/macros.h" |
+#include "base/synchronization/lock.h" |
namespace mojo { |
namespace internal { |
+namespace { |
+ |
+// Similar to base::AutoLock, except that it does nothing if |lock| passed into |
+// the constructor is null. |
+class MayAutoLock { |
+ public: |
+ explicit MayAutoLock(base::Lock* lock) : lock_(lock) { |
+ if (lock_) |
+ lock_->Acquire(); |
+ } |
+ |
+ ~MayAutoLock() { |
+ if (lock_) { |
+ lock_->AssertAcquired(); |
+ lock_->Release(); |
+ } |
+ } |
+ |
+ private: |
+ base::Lock* lock_; |
+ DISALLOW_COPY_AND_ASSIGN(MayAutoLock); |
+}; |
+ |
+} // namespace |
+ |
// ---------------------------------------------------------------------------- |
Connector::Connector(ScopedMessagePipeHandle message_pipe, |
+ ConnectorConfig config, |
const MojoAsyncWaiter* waiter) |
: waiter_(waiter), |
message_pipe_(message_pipe.Pass()), |
@@ -21,13 +49,16 @@ Connector::Connector(ScopedMessagePipeHandle message_pipe, |
drop_writes_(false), |
enforce_errors_from_incoming_receiver_(true), |
paused_(false), |
- destroyed_flag_(nullptr) { |
+ destroyed_flag_(nullptr), |
+ lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr) { |
// Even though we don't have an incoming receiver, we still want to monitor |
// the message pipe to know if is closed or encounters an error. |
WaitToReadMore(); |
} |
Connector::~Connector() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
if (destroyed_flag_) |
*destroyed_flag_ = true; |
@@ -35,20 +66,30 @@ Connector::~Connector() { |
} |
void Connector::CloseMessagePipe() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
CancelWait(); |
+ MayAutoLock locker(lock_.get()); |
Close(message_pipe_.Pass()); |
} |
ScopedMessagePipeHandle Connector::PassMessagePipe() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
CancelWait(); |
+ MayAutoLock locker(lock_.get()); |
return message_pipe_.Pass(); |
} |
void Connector::RaiseError() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
HandleError(true, true); |
} |
bool Connector::WaitForIncomingMessage(MojoDeadline deadline) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
if (error_) |
return false; |
@@ -69,6 +110,8 @@ bool Connector::WaitForIncomingMessage(MojoDeadline deadline) { |
} |
void Connector::PauseIncomingMethodCallProcessing() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
if (paused_) |
return; |
@@ -77,6 +120,8 @@ void Connector::PauseIncomingMethodCallProcessing() { |
} |
void Connector::ResumeIncomingMethodCallProcessing() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
if (!paused_) |
return; |
@@ -85,11 +130,17 @@ void Connector::ResumeIncomingMethodCallProcessing() { |
} |
bool Connector::Accept(Message* message) { |
+ DCHECK(lock_ || thread_checker_.CalledOnValidThread()); |
+ |
+ // It shouldn't hurt even if |error_| may be changed by a different thread at |
+ // the same time. The outcome is that we may write into |message_pipe_| after |
+ // encountering an error, which should be fine. |
if (error_) |
return false; |
- MOJO_CHECK(message_pipe_.is_valid()); |
- if (drop_writes_) |
+ MayAutoLock locker(lock_.get()); |
+ |
+ if (!message_pipe_.is_valid() || drop_writes_) |
return true; |
MojoResult rv = |
@@ -124,10 +175,10 @@ bool Connector::Accept(Message* message) { |
// a data pipe handle in the middle of a two-phase read/write, |
// regardless of which thread that two-phase read/write is happening |
// on). |
- // TODO(vtl): I wonder if this should be a |MOJO_DCHECK()|. (But, until |
+ // TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until |
// crbug.com/389666, etc. are resolved, this will make tests fail quickly |
// rather than hanging.) |
- MOJO_CHECK(false) << "Race condition or other bug detected"; |
+ CHECK(false) << "Race condition or other bug detected"; |
return false; |
default: |
// This particular write was rejected, presumably because of bad input. |
@@ -144,7 +195,9 @@ void Connector::CallOnHandleReady(void* closure, MojoResult result) { |
} |
void Connector::OnHandleReady(MojoResult result) { |
- MOJO_CHECK(async_wait_id_ != 0); |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
+ CHECK(async_wait_id_ != 0); |
async_wait_id_ = 0; |
if (result != MOJO_RESULT_OK) { |
HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); |
@@ -155,7 +208,7 @@ void Connector::OnHandleReady(MojoResult result) { |
} |
void Connector::WaitToReadMore() { |
- MOJO_CHECK(!async_wait_id_); |
+ CHECK(!async_wait_id_); |
async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(), |
MOJO_HANDLE_SIGNAL_READABLE, |
MOJO_DEADLINE_INDEFINITE, |
@@ -237,7 +290,9 @@ void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { |
} |
if (force_pipe_reset) { |
- CloseMessagePipe(); |
+ CancelWait(); |
+ MayAutoLock locker(lock_.get()); |
+ Close(message_pipe_.Pass()); |
MessagePipe dummy_pipe; |
message_pipe_ = dummy_pipe.handle0.Pass(); |
} else { |