Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(84)

Side by Side Diff: mojo/public/cpp/bindings/lib/router.cc

Issue 1720093002: Revert of Mojo C++ bindings: support sync methods - part 2 (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/router.h" 5 #include "mojo/public/cpp/bindings/lib/router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/bind.h"
11 #include "base/logging.h" 10 #include "base/logging.h"
12 #include "base/message_loop/message_loop.h"
13 #include "base/stl_util.h"
14 11
15 namespace mojo { 12 namespace mojo {
16 namespace internal { 13 namespace internal {
17 14
18 // ---------------------------------------------------------------------------- 15 // ----------------------------------------------------------------------------
19 16
20 namespace { 17 namespace {
21 18
22 class ResponderThunk : public MessageReceiverWithStatus { 19 class ResponderThunk : public MessageReceiverWithStatus {
23 public: 20 public:
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
56 53
57 private: 54 private:
58 base::WeakPtr<Router> router_; 55 base::WeakPtr<Router> router_;
59 bool accept_was_invoked_; 56 bool accept_was_invoked_;
60 }; 57 };
61 58
62 } // namespace 59 } // namespace
63 60
64 // ---------------------------------------------------------------------------- 61 // ----------------------------------------------------------------------------
65 62
66 Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received)
67 : response_received(in_response_received) {}
68
69 Router::SyncResponseInfo::~SyncResponseInfo() {}
70
71 // ----------------------------------------------------------------------------
72
73 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) 63 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router)
74 : router_(router) { 64 : router_(router) {
75 } 65 }
76 66
77 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() { 67 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() {
78 } 68 }
79 69
80 bool Router::HandleIncomingMessageThunk::Accept(Message* message) { 70 bool Router::HandleIncomingMessageThunk::Accept(Message* message) {
81 return router_->HandleIncomingMessage(message); 71 return router_->HandleIncomingMessage(message);
82 } 72 }
83 73
84 // ---------------------------------------------------------------------------- 74 // ----------------------------------------------------------------------------
85 75
86 Router::Router(ScopedMessagePipeHandle message_pipe, 76 Router::Router(ScopedMessagePipeHandle message_pipe,
87 FilterChain filters, 77 FilterChain filters,
88 bool expects_sync_requests,
89 const MojoAsyncWaiter* waiter) 78 const MojoAsyncWaiter* waiter)
90 : thunk_(this), 79 : thunk_(this),
91 filters_(std::move(filters)), 80 filters_(std::move(filters)),
92 connector_(std::move(message_pipe), 81 connector_(std::move(message_pipe),
93 Connector::SINGLE_THREADED_SEND, 82 Connector::SINGLE_THREADED_SEND,
94 waiter), 83 waiter),
95 incoming_receiver_(nullptr), 84 incoming_receiver_(nullptr),
96 next_request_id_(0), 85 next_request_id_(0),
97 testing_mode_(false), 86 testing_mode_(false),
98 pending_task_for_messages_(false),
99 weak_factory_(this) { 87 weak_factory_(this) {
100 filters_.SetSink(&thunk_); 88 filters_.SetSink(&thunk_);
101 if (expects_sync_requests)
102 connector_.RegisterSyncHandleWatch();
103 connector_.set_incoming_receiver(filters_.GetHead()); 89 connector_.set_incoming_receiver(filters_.GetHead());
104 } 90 }
105 91
106 Router::~Router() {} 92 Router::~Router() {
93 weak_factory_.InvalidateWeakPtrs();
94
95 for (auto& pair : async_responders_)
96 delete pair.second;
97 for (auto& pair : sync_responders_)
98 delete pair.second;
99 }
107 100
108 bool Router::Accept(Message* message) { 101 bool Router::Accept(Message* message) {
109 DCHECK(thread_checker_.CalledOnValidThread()); 102 DCHECK(thread_checker_.CalledOnValidThread());
110 DCHECK(!message->has_flag(kMessageExpectsResponse)); 103 DCHECK(!message->has_flag(kMessageExpectsResponse));
111 return connector_.Accept(message); 104 return connector_.Accept(message);
112 } 105 }
113 106
114 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { 107 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) {
115 DCHECK(thread_checker_.CalledOnValidThread()); 108 DCHECK(thread_checker_.CalledOnValidThread());
116 DCHECK(message->has_flag(kMessageExpectsResponse)); 109 DCHECK(message->has_flag(kMessageExpectsResponse));
117 110
118 // Reserve 0 in case we want it to convey special meaning in the future. 111 // Reserve 0 in case we want it to convey special meaning in the future.
119 uint64_t request_id = next_request_id_++; 112 uint64_t request_id = next_request_id_++;
120 if (request_id == 0) 113 if (request_id == 0)
121 request_id = next_request_id_++; 114 request_id = next_request_id_++;
122 115
123 message->set_request_id(request_id); 116 message->set_request_id(request_id);
124 if (!connector_.Accept(message)) 117 if (!connector_.Accept(message))
125 return false; 118 return false;
126 119
127 if (!message->has_flag(kMessageIsSync)) { 120 if (!message->has_flag(kMessageIsSync)) {
128 // We assume ownership of |responder|. 121 // We assume ownership of |responder|.
129 async_responders_[request_id] = make_scoped_ptr(responder); 122 async_responders_[request_id] = responder;
130 return true; 123 return true;
131 } 124 }
132 125
133 if (!connector_.RegisterSyncHandleWatch()) 126 sync_responders_[request_id] = responder;
134 return false;
135
136 bool response_received = false;
137 scoped_ptr<MessageReceiver> sync_responder(responder);
138 sync_responses_.insert(std::make_pair(
139 request_id, make_scoped_ptr(new SyncResponseInfo(&response_received))));
140
141 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); 127 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
142 do { 128 for (;;) {
143 bool result = connector_.RunSyncHandleWatch(&response_received); 129 // TODO(yzshen): Here we should allow incoming sync requests to re-enter and
130 // block async messages.
131 bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
132 // The message pipe has disconnected.
144 if (!result) 133 if (!result)
145 break; 134 break;
146 135
147 // This instance has been destroyed. 136 // This instance has been destroyed.
148 if (!weak_self) 137 if (!weak_self)
149 break; 138 break;
150 139
151 // The corresponding response message has arrived. 140 // The corresponding response message has arrived.
152 DCHECK(response_received); 141 if (sync_responders_.find(request_id) == sync_responders_.end())
153 DCHECK(ContainsKey(sync_responses_, request_id)); 142 break;
154 auto iter = sync_responses_.find(request_id); 143 }
155 DCHECK_EQ(&response_received, iter->second->response_received);
156 scoped_ptr<Message> response = std::move(iter->second->response);
157 sync_responses_.erase(iter);
158 ignore_result(sync_responder->Accept(response.get()));
159 } while (false);
160
161 if (weak_self)
162 connector_.UnregisterSyncHandleWatch();
163 144
164 // Return true means that we take ownership of |responder|. 145 // Return true means that we take ownership of |responder|.
165 return true; 146 return true;
166 } 147 }
167 148
168 void Router::EnableTestingMode() { 149 void Router::EnableTestingMode() {
169 DCHECK(thread_checker_.CalledOnValidThread()); 150 DCHECK(thread_checker_.CalledOnValidThread());
170 testing_mode_ = true; 151 testing_mode_ = true;
171 connector_.set_enforce_errors_from_incoming_receiver(false); 152 connector_.set_enforce_errors_from_incoming_receiver(false);
172 } 153 }
173 154
174 bool Router::HandleIncomingMessage(Message* message) { 155 bool Router::HandleIncomingMessage(Message* message) {
175 DCHECK(thread_checker_.CalledOnValidThread()); 156 DCHECK(thread_checker_.CalledOnValidThread());
176
177 const bool during_sync_call =
178 connector_.during_sync_handle_watcher_callback();
179 if (!message->has_flag(kMessageIsSync) &&
180 (during_sync_call || !pending_messages_.empty())) {
181 scoped_ptr<Message> pending_message(new Message);
182 message->MoveTo(pending_message.get());
183 pending_messages_.push(std::move(pending_message));
184
185 if (!pending_task_for_messages_) {
186 pending_task_for_messages_ = true;
187 base::MessageLoop::current()->PostTask(
188 FROM_HERE, base::Bind(&Router::HandleQueuedMessages,
189 weak_factory_.GetWeakPtr()));
190 }
191
192 return true;
193 }
194
195 return HandleMessageInternal(message);
196 }
197
198 void Router::HandleQueuedMessages() {
199 DCHECK(thread_checker_.CalledOnValidThread());
200 DCHECK(pending_task_for_messages_);
201
202 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
203 while (!pending_messages_.empty()) {
204 scoped_ptr<Message> message(std::move(pending_messages_.front()));
205 pending_messages_.pop();
206
207 bool result = HandleMessageInternal(message.get());
208 if (!weak_self)
209 return;
210
211 if (!result && !testing_mode_) {
212 connector_.RaiseError();
213 break;
214 }
215 }
216
217 pending_task_for_messages_ = false;
218 }
219
220 bool Router::HandleMessageInternal(Message* message) {
221 if (message->has_flag(kMessageExpectsResponse)) { 157 if (message->has_flag(kMessageExpectsResponse)) {
222 if (!incoming_receiver_) 158 if (!incoming_receiver_)
223 return false; 159 return false;
224 160
225 MessageReceiverWithStatus* responder = 161 MessageReceiverWithStatus* responder =
226 new ResponderThunk(weak_factory_.GetWeakPtr()); 162 new ResponderThunk(weak_factory_.GetWeakPtr());
227 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); 163 bool ok = incoming_receiver_->AcceptWithResponder(message, responder);
228 if (!ok) 164 if (!ok)
229 delete responder; 165 delete responder;
230 return ok; 166 return ok;
231 167
232 } else if (message->has_flag(kMessageIsResponse)) { 168 } else if (message->has_flag(kMessageIsResponse)) {
169 ResponderMap& responder_map = message->has_flag(kMessageIsSync)
170 ? sync_responders_
171 : async_responders_;
233 uint64_t request_id = message->request_id(); 172 uint64_t request_id = message->request_id();
234 173 ResponderMap::iterator it = responder_map.find(request_id);
235 if (message->has_flag(kMessageIsSync)) { 174 if (it == responder_map.end()) {
236 auto it = sync_responses_.find(request_id);
237 if (it == sync_responses_.end()) {
238 DCHECK(testing_mode_);
239 return false;
240 }
241 it->second->response.reset(new Message());
242 message->MoveTo(it->second->response.get());
243 *it->second->response_received = true;
244 return true;
245 }
246
247 auto it = async_responders_.find(request_id);
248 if (it == async_responders_.end()) {
249 DCHECK(testing_mode_); 175 DCHECK(testing_mode_);
250 return false; 176 return false;
251 } 177 }
252 scoped_ptr<MessageReceiver> responder = std::move(it->second); 178 MessageReceiver* responder = it->second;
253 async_responders_.erase(it); 179 responder_map.erase(it);
254 return responder->Accept(message); 180 bool ok = responder->Accept(message);
181 delete responder;
182 return ok;
255 } else { 183 } else {
256 if (!incoming_receiver_) 184 if (!incoming_receiver_)
257 return false; 185 return false;
258 186
259 return incoming_receiver_->Accept(message); 187 return incoming_receiver_->Accept(message);
260 } 188 }
261 } 189 }
262 190
263 // ---------------------------------------------------------------------------- 191 // ----------------------------------------------------------------------------
264 192
265 } // namespace internal 193 } // namespace internal
266 } // namespace mojo 194 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/router.h ('k') | mojo/public/cpp/bindings/lib/sync_handle_watcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698