Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(273)

Side by Side Diff: mojo/public/cpp/bindings/lib/connector.cc

Issue 2608163003: Change single-interface mojo bindings to use SequencedTaskRunner. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/connector.h" 5 #include "mojo/public/cpp/bindings/connector.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/location.h" 11 #include "base/location.h"
12 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "base/macros.h" 13 #include "base/macros.h"
14 #include "base/synchronization/lock.h" 14 #include "base/synchronization/lock.h"
15 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" 15 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
16 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" 16 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
17 17
18 namespace mojo { 18 namespace mojo {
19 19
20 Connector::Connector(ScopedMessagePipeHandle message_pipe, 20 Connector::Connector(ScopedMessagePipeHandle message_pipe,
21 ConnectorConfig config, 21 ConnectorConfig config,
22 scoped_refptr<base::SingleThreadTaskRunner> runner) 22 scoped_refptr<base::SequencedTaskRunner> runner)
23 : message_pipe_(std::move(message_pipe)), 23 : message_pipe_(std::move(message_pipe)),
24 task_runner_(std::move(runner)), 24 task_runner_(std::move(runner)),
25 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr), 25 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr),
26 weak_factory_(this) { 26 weak_factory_(this) {
27 weak_self_ = weak_factory_.GetWeakPtr(); 27 weak_self_ = weak_factory_.GetWeakPtr();
28 // Even though we don't have an incoming receiver, we still want to monitor 28 // Even though we don't have an incoming receiver, we still want to monitor
29 // the message pipe to know if is closed or encounters an error. 29 // the message pipe to know if is closed or encounters an error.
30 WaitToReadMore(); 30 WaitToReadMore();
31 } 31 }
32 32
33 Connector::~Connector() { 33 Connector::~Connector() {
34 { 34 {
35 // Allow for quick destruction on any thread if the pipe is already closed. 35 // Allow for quick destruction on any thread if the pipe is already closed.
36 base::AutoLock lock(connected_lock_); 36 base::AutoLock lock(connected_lock_);
37 if (!connected_) 37 if (!connected_)
38 return; 38 return;
39 } 39 }
40 40
41 DCHECK(thread_checker_.CalledOnValidThread()); 41 DCHECK(sequence_checker_.CalledOnValidSequence());
42 CancelWait(); 42 CancelWait();
43 } 43 }
44 44
45 void Connector::CloseMessagePipe() { 45 void Connector::CloseMessagePipe() {
46 // Throw away the returned message pipe. 46 // Throw away the returned message pipe.
47 PassMessagePipe(); 47 PassMessagePipe();
48 } 48 }
49 49
50 ScopedMessagePipeHandle Connector::PassMessagePipe() { 50 ScopedMessagePipeHandle Connector::PassMessagePipe() {
51 DCHECK(thread_checker_.CalledOnValidThread()); 51 DCHECK(sequence_checker_.CalledOnValidSequence());
52 52
53 CancelWait(); 53 CancelWait();
54 internal::MayAutoLock locker(lock_.get()); 54 internal::MayAutoLock locker(lock_.get());
55 ScopedMessagePipeHandle message_pipe = std::move(message_pipe_); 55 ScopedMessagePipeHandle message_pipe = std::move(message_pipe_);
56 weak_factory_.InvalidateWeakPtrs(); 56 weak_factory_.InvalidateWeakPtrs();
57 sync_handle_watcher_callback_count_ = 0; 57 sync_handle_watcher_callback_count_ = 0;
58 58
59 base::AutoLock lock(connected_lock_); 59 base::AutoLock lock(connected_lock_);
60 connected_ = false; 60 connected_ = false;
61 return message_pipe; 61 return message_pipe;
62 } 62 }
63 63
64 void Connector::RaiseError() { 64 void Connector::RaiseError() {
65 DCHECK(thread_checker_.CalledOnValidThread()); 65 DCHECK(sequence_checker_.CalledOnValidSequence());
66 66
67 HandleError(true, true); 67 HandleError(true, true);
68 } 68 }
69 69
70 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) { 70 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) {
71 DCHECK(thread_checker_.CalledOnValidThread()); 71 DCHECK(sequence_checker_.CalledOnValidSequence());
72 72
73 if (error_) 73 if (error_)
74 return false; 74 return false;
75 75
76 ResumeIncomingMethodCallProcessing(); 76 ResumeIncomingMethodCallProcessing();
77 77
78 MojoResult rv = 78 MojoResult rv =
79 Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr); 79 Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr);
80 if (rv == MOJO_RESULT_SHOULD_WAIT || rv == MOJO_RESULT_DEADLINE_EXCEEDED) 80 if (rv == MOJO_RESULT_SHOULD_WAIT || rv == MOJO_RESULT_DEADLINE_EXCEEDED)
81 return false; 81 return false;
82 if (rv != MOJO_RESULT_OK) { 82 if (rv != MOJO_RESULT_OK) {
83 // Users that call WaitForIncomingMessage() should expect their code to be 83 // Users that call WaitForIncomingMessage() should expect their code to be
84 // re-entered, so we call the error handler synchronously. 84 // re-entered, so we call the error handler synchronously.
85 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); 85 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
86 return false; 86 return false;
87 } 87 }
88 ignore_result(ReadSingleMessage(&rv)); 88 ignore_result(ReadSingleMessage(&rv));
89 return (rv == MOJO_RESULT_OK); 89 return (rv == MOJO_RESULT_OK);
90 } 90 }
91 91
92 void Connector::PauseIncomingMethodCallProcessing() { 92 void Connector::PauseIncomingMethodCallProcessing() {
93 DCHECK(thread_checker_.CalledOnValidThread()); 93 DCHECK(sequence_checker_.CalledOnValidSequence());
94 94
95 if (paused_) 95 if (paused_)
96 return; 96 return;
97 97
98 paused_ = true; 98 paused_ = true;
99 CancelWait(); 99 CancelWait();
100 } 100 }
101 101
102 void Connector::ResumeIncomingMethodCallProcessing() { 102 void Connector::ResumeIncomingMethodCallProcessing() {
103 DCHECK(thread_checker_.CalledOnValidThread()); 103 DCHECK(sequence_checker_.CalledOnValidSequence());
104 104
105 if (!paused_) 105 if (!paused_)
106 return; 106 return;
107 107
108 paused_ = false; 108 paused_ = false;
109 WaitToReadMore(); 109 WaitToReadMore();
110 } 110 }
111 111
112 bool Connector::Accept(Message* message) { 112 bool Connector::Accept(Message* message) {
113 DCHECK(lock_ || thread_checker_.CalledOnValidThread()); 113 DCHECK(lock_ || sequence_checker_.CalledOnValidSequence());
114 114
115 // It shouldn't hurt even if |error_| may be changed by a different thread at 115 // It shouldn't hurt even if |error_| may be changed by a different thread at
116 // the same time. The outcome is that we may write into |message_pipe_| after 116 // the same time. The outcome is that we may write into |message_pipe_| after
117 // encountering an error, which should be fine. 117 // encountering an error, which should be fine.
118 if (error_) 118 if (error_)
119 return false; 119 return false;
120 120
121 internal::MayAutoLock locker(lock_.get()); 121 internal::MayAutoLock locker(lock_.get());
122 122
123 if (!message_pipe_.is_valid() || drop_writes_) 123 if (!message_pipe_.is_valid() || drop_writes_)
(...skipping 28 matching lines...) Expand all
152 return false; 152 return false;
153 default: 153 default:
154 // This particular write was rejected, presumably because of bad input. 154 // This particular write was rejected, presumably because of bad input.
155 // The pipe is not necessarily in a bad state. 155 // The pipe is not necessarily in a bad state.
156 return false; 156 return false;
157 } 157 }
158 return true; 158 return true;
159 } 159 }
160 160
161 void Connector::AllowWokenUpBySyncWatchOnSameThread() { 161 void Connector::AllowWokenUpBySyncWatchOnSameThread() {
162 DCHECK(thread_checker_.CalledOnValidThread()); 162 DCHECK(sequence_checker_.CalledOnValidSequence());
163 163
164 allow_woken_up_by_others_ = true; 164 allow_woken_up_by_others_ = true;
165 165
166 EnsureSyncWatcherExists(); 166 EnsureSyncWatcherExists();
167 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); 167 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
168 } 168 }
169 169
170 bool Connector::SyncWatch(const bool* should_stop) { 170 bool Connector::SyncWatch(const bool* should_stop) {
171 DCHECK(thread_checker_.CalledOnValidThread()); 171 DCHECK(sequence_checker_.CalledOnValidSequence());
172 172
173 if (error_) 173 if (error_)
174 return false; 174 return false;
175 175
176 ResumeIncomingMethodCallProcessing(); 176 ResumeIncomingMethodCallProcessing();
177 177
178 EnsureSyncWatcherExists(); 178 EnsureSyncWatcherExists();
179 return sync_watcher_->SyncWatch(should_stop); 179 return sync_watcher_->SyncWatch(should_stop);
180 } 180 }
181 181
(...skipping 14 matching lines...) Expand all
196 sync_handle_watcher_callback_count_++; 196 sync_handle_watcher_callback_count_++;
197 OnHandleReadyInternal(result); 197 OnHandleReadyInternal(result);
198 // At this point, this object might have been deleted. 198 // At this point, this object might have been deleted.
199 if (weak_self) { 199 if (weak_self) {
200 DCHECK_LT(0u, sync_handle_watcher_callback_count_); 200 DCHECK_LT(0u, sync_handle_watcher_callback_count_);
201 sync_handle_watcher_callback_count_--; 201 sync_handle_watcher_callback_count_--;
202 } 202 }
203 } 203 }
204 204
205 void Connector::OnHandleReadyInternal(MojoResult result) { 205 void Connector::OnHandleReadyInternal(MojoResult result) {
206 DCHECK(thread_checker_.CalledOnValidThread()); 206 DCHECK(sequence_checker_.CalledOnValidSequence());
207 207
208 if (result != MOJO_RESULT_OK) { 208 if (result != MOJO_RESULT_OK) {
209 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); 209 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
210 return; 210 return;
211 } 211 }
212 ReadAllAvailableMessages(); 212 ReadAllAvailableMessages();
213 // At this point, this object might have been deleted. Return. 213 // At this point, this object might have been deleted. Return.
214 } 214 }
215 215
216 void Connector::WaitToReadMore() { 216 void Connector::WaitToReadMore() {
(...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after
333 void Connector::EnsureSyncWatcherExists() { 333 void Connector::EnsureSyncWatcherExists() {
334 if (sync_watcher_) 334 if (sync_watcher_)
335 return; 335 return;
336 sync_watcher_.reset(new SyncHandleWatcher( 336 sync_watcher_.reset(new SyncHandleWatcher(
337 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, 337 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
338 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, 338 base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
339 base::Unretained(this)))); 339 base::Unretained(this))));
340 } 340 }
341 341
342 } // namespace mojo 342 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/binding_state.cc ('k') | mojo/public/cpp/bindings/lib/interface_endpoint_client.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698