Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(156)

Side by Side Diff: mojo/public/cpp/bindings/lib/router.cc

Issue 1713203002: Mojo C++ bindings: support sync methods - part 2 (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/router.h" 5 #include "mojo/public/cpp/bindings/lib/router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/bind.h"
10 #include "base/logging.h" 11 #include "base/logging.h"
12 #include "base/message_loop/message_loop.h"
13 #include "base/stl_util.h"
11 14
12 namespace mojo { 15 namespace mojo {
13 namespace internal { 16 namespace internal {
14 17
15 // ---------------------------------------------------------------------------- 18 // ----------------------------------------------------------------------------
16 19
17 namespace { 20 namespace {
18 21
19 class ResponderThunk : public MessageReceiverWithStatus { 22 class ResponderThunk : public MessageReceiverWithStatus {
20 public: 23 public:
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
53 56
54 private: 57 private:
55 base::WeakPtr<Router> router_; 58 base::WeakPtr<Router> router_;
56 bool accept_was_invoked_; 59 bool accept_was_invoked_;
57 }; 60 };
58 61
59 } // namespace 62 } // namespace
60 63
61 // ---------------------------------------------------------------------------- 64 // ----------------------------------------------------------------------------
62 65
66 Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received)
67 : response_received(in_response_received) {}
68
69 Router::SyncResponseInfo::~SyncResponseInfo() {}
70
71 // ----------------------------------------------------------------------------
72
63 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) 73 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router)
64 : router_(router) { 74 : router_(router) {
65 } 75 }
66 76
67 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() { 77 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() {
68 } 78 }
69 79
70 bool Router::HandleIncomingMessageThunk::Accept(Message* message) { 80 bool Router::HandleIncomingMessageThunk::Accept(Message* message) {
71 return router_->HandleIncomingMessage(message); 81 return router_->HandleIncomingMessage(message);
72 } 82 }
73 83
74 // ---------------------------------------------------------------------------- 84 // ----------------------------------------------------------------------------
75 85
76 Router::Router(ScopedMessagePipeHandle message_pipe, 86 Router::Router(ScopedMessagePipeHandle message_pipe,
77 FilterChain filters, 87 FilterChain filters,
88 bool expects_sync_requests,
78 const MojoAsyncWaiter* waiter) 89 const MojoAsyncWaiter* waiter)
79 : thunk_(this), 90 : thunk_(this),
80 filters_(std::move(filters)), 91 filters_(std::move(filters)),
81 connector_(std::move(message_pipe), 92 connector_(std::move(message_pipe),
82 Connector::SINGLE_THREADED_SEND, 93 Connector::SINGLE_THREADED_SEND,
83 waiter), 94 waiter),
84 incoming_receiver_(nullptr), 95 incoming_receiver_(nullptr),
85 next_request_id_(0), 96 next_request_id_(0),
86 testing_mode_(false), 97 testing_mode_(false),
98 pending_task_for_messages_(false),
87 weak_factory_(this) { 99 weak_factory_(this) {
88 filters_.SetSink(&thunk_); 100 filters_.SetSink(&thunk_);
101 if (expects_sync_requests)
102 connector_.RegisterSyncHandleWatch();
89 connector_.set_incoming_receiver(filters_.GetHead()); 103 connector_.set_incoming_receiver(filters_.GetHead());
90 } 104 }
91 105
92 Router::~Router() { 106 Router::~Router() {}
93 weak_factory_.InvalidateWeakPtrs();
94
95 for (auto& pair : async_responders_)
96 delete pair.second;
97 for (auto& pair : sync_responders_)
98 delete pair.second;
99 }
100 107
101 bool Router::Accept(Message* message) { 108 bool Router::Accept(Message* message) {
102 DCHECK(thread_checker_.CalledOnValidThread()); 109 DCHECK(thread_checker_.CalledOnValidThread());
103 DCHECK(!message->has_flag(kMessageExpectsResponse)); 110 DCHECK(!message->has_flag(kMessageExpectsResponse));
104 return connector_.Accept(message); 111 return connector_.Accept(message);
105 } 112 }
106 113
107 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { 114 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) {
108 DCHECK(thread_checker_.CalledOnValidThread()); 115 DCHECK(thread_checker_.CalledOnValidThread());
109 DCHECK(message->has_flag(kMessageExpectsResponse)); 116 DCHECK(message->has_flag(kMessageExpectsResponse));
110 117
111 // Reserve 0 in case we want it to convey special meaning in the future. 118 // Reserve 0 in case we want it to convey special meaning in the future.
112 uint64_t request_id = next_request_id_++; 119 uint64_t request_id = next_request_id_++;
113 if (request_id == 0) 120 if (request_id == 0)
114 request_id = next_request_id_++; 121 request_id = next_request_id_++;
115 122
116 message->set_request_id(request_id); 123 message->set_request_id(request_id);
117 if (!connector_.Accept(message)) 124 if (!connector_.Accept(message))
118 return false; 125 return false;
119 126
120 if (!message->has_flag(kMessageIsSync)) { 127 if (!message->has_flag(kMessageIsSync)) {
121 // We assume ownership of |responder|. 128 // We assume ownership of |responder|.
122 async_responders_[request_id] = responder; 129 async_responders_[request_id] = make_scoped_ptr(responder);
123 return true; 130 return true;
124 } 131 }
125 132
126 sync_responders_[request_id] = responder; 133 if (!connector_.RegisterSyncHandleWatch())
134 return false;
135
136 bool response_received = false;
137 scoped_ptr<MessageReceiver> sync_responder(responder);
138 sync_responses_.insert(std::make_pair(
139 request_id, make_scoped_ptr(new SyncResponseInfo(&response_received))));
140
127 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); 141 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
128 for (;;) { 142 do {
129 // TODO(yzshen): Here we should allow incoming sync requests to re-enter and 143 bool result = connector_.RunSyncHandleWatch(&response_received);
130 // block async messages.
131 bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
132 // The message pipe has disconnected.
133 if (!result) 144 if (!result)
134 break; 145 break;
135 146
136 // This instance has been destroyed. 147 // This instance has been destroyed.
137 if (!weak_self) 148 if (!weak_self)
138 break; 149 break;
139 150
140 // The corresponding response message has arrived. 151 // The corresponding response message has arrived.
141 if (sync_responders_.find(request_id) == sync_responders_.end()) 152 DCHECK(response_received);
142 break; 153 DCHECK(ContainsKey(sync_responses_, request_id));
143 } 154 auto iter = sync_responses_.find(request_id);
155 DCHECK_EQ(&response_received, iter->second->response_received);
156 scoped_ptr<Message> response = std::move(iter->second->response);
157 sync_responses_.erase(iter);
158 ignore_result(sync_responder->Accept(response.get()));
159 } while (false);
160
161 if (weak_self)
162 connector_.UnregisterSyncHandleWatch();
144 163
145 // Return true means that we take ownership of |responder|. 164 // Return true means that we take ownership of |responder|.
146 return true; 165 return true;
147 } 166 }
148 167
149 void Router::EnableTestingMode() { 168 void Router::EnableTestingMode() {
150 DCHECK(thread_checker_.CalledOnValidThread()); 169 DCHECK(thread_checker_.CalledOnValidThread());
151 testing_mode_ = true; 170 testing_mode_ = true;
152 connector_.set_enforce_errors_from_incoming_receiver(false); 171 connector_.set_enforce_errors_from_incoming_receiver(false);
153 } 172 }
154 173
155 bool Router::HandleIncomingMessage(Message* message) { 174 bool Router::HandleIncomingMessage(Message* message) {
156 DCHECK(thread_checker_.CalledOnValidThread()); 175 DCHECK(thread_checker_.CalledOnValidThread());
176
177 const bool during_sync_call =
178 connector_.during_sync_handle_watcher_callback();
179 if (!message->has_flag(kMessageIsSync) &&
180 (during_sync_call || !pending_messages_.empty())) {
181 scoped_ptr<Message> pending_message(new Message);
182 message->MoveTo(pending_message.get());
183 pending_messages_.push(std::move(pending_message));
184
185 if (!pending_task_for_messages_) {
186 pending_task_for_messages_ = true;
187 base::MessageLoop::current()->PostTask(
188 FROM_HERE, base::Bind(&Router::HandleQueuedMessages,
189 weak_factory_.GetWeakPtr()));
190 }
191
192 return true;
193 }
194
195 return HandleMessageInternal(message);
196 }
197
198 void Router::HandleQueuedMessages() {
199 DCHECK(thread_checker_.CalledOnValidThread());
200 DCHECK(pending_task_for_messages_);
201
202 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
203 while (!pending_messages_.empty()) {
204 scoped_ptr<Message> message(std::move(pending_messages_.front()));
205 pending_messages_.pop();
206
207 bool result = HandleMessageInternal(message.get());
208 if (!weak_self)
209 return;
210
211 if (!result && !testing_mode_) {
212 connector_.RaiseError();
213 break;
214 }
215 }
216
217 pending_task_for_messages_ = false;
218 }
219
220 bool Router::HandleMessageInternal(Message* message) {
157 if (message->has_flag(kMessageExpectsResponse)) { 221 if (message->has_flag(kMessageExpectsResponse)) {
158 if (!incoming_receiver_) 222 if (!incoming_receiver_)
159 return false; 223 return false;
160 224
161 MessageReceiverWithStatus* responder = 225 MessageReceiverWithStatus* responder =
162 new ResponderThunk(weak_factory_.GetWeakPtr()); 226 new ResponderThunk(weak_factory_.GetWeakPtr());
163 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); 227 bool ok = incoming_receiver_->AcceptWithResponder(message, responder);
164 if (!ok) 228 if (!ok)
165 delete responder; 229 delete responder;
166 return ok; 230 return ok;
167 231
168 } else if (message->has_flag(kMessageIsResponse)) { 232 } else if (message->has_flag(kMessageIsResponse)) {
169 ResponderMap& responder_map = message->has_flag(kMessageIsSync)
170 ? sync_responders_
171 : async_responders_;
172 uint64_t request_id = message->request_id(); 233 uint64_t request_id = message->request_id();
173 ResponderMap::iterator it = responder_map.find(request_id); 234
174 if (it == responder_map.end()) { 235 if (message->has_flag(kMessageIsSync)) {
236 auto it = sync_responses_.find(request_id);
237 if (it == sync_responses_.end()) {
238 DCHECK(testing_mode_);
239 return false;
240 }
241 it->second->response.reset(new Message());
242 message->MoveTo(it->second->response.get());
243 *it->second->response_received = true;
244 return true;
245 }
246
247 auto it = async_responders_.find(request_id);
248 if (it == async_responders_.end()) {
175 DCHECK(testing_mode_); 249 DCHECK(testing_mode_);
176 return false; 250 return false;
177 } 251 }
178 MessageReceiver* responder = it->second; 252 scoped_ptr<MessageReceiver> responder = std::move(it->second);
179 responder_map.erase(it); 253 async_responders_.erase(it);
180 bool ok = responder->Accept(message); 254 return responder->Accept(message);
181 delete responder;
182 return ok;
183 } else { 255 } else {
184 if (!incoming_receiver_) 256 if (!incoming_receiver_)
185 return false; 257 return false;
186 258
187 return incoming_receiver_->Accept(message); 259 return incoming_receiver_->Accept(message);
188 } 260 }
189 } 261 }
190 262
191 // ---------------------------------------------------------------------------- 263 // ----------------------------------------------------------------------------
192 264
193 } // namespace internal 265 } // namespace internal
194 } // namespace mojo 266 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/router.h ('k') | mojo/public/cpp/bindings/lib/sync_handle_watcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698