OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/lib/router.h" | 5 #include "mojo/public/cpp/bindings/lib/router.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 #include <utility> | 8 #include <utility> |
9 | 9 |
10 #include "base/bind.h" | |
11 #include "base/logging.h" | 10 #include "base/logging.h" |
12 #include "base/message_loop/message_loop.h" | |
13 #include "base/stl_util.h" | |
14 | 11 |
15 namespace mojo { | 12 namespace mojo { |
16 namespace internal { | 13 namespace internal { |
17 | 14 |
18 // ---------------------------------------------------------------------------- | 15 // ---------------------------------------------------------------------------- |
19 | 16 |
20 namespace { | 17 namespace { |
21 | 18 |
22 class ResponderThunk : public MessageReceiverWithStatus { | 19 class ResponderThunk : public MessageReceiverWithStatus { |
23 public: | 20 public: |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
56 | 53 |
57 private: | 54 private: |
58 base::WeakPtr<Router> router_; | 55 base::WeakPtr<Router> router_; |
59 bool accept_was_invoked_; | 56 bool accept_was_invoked_; |
60 }; | 57 }; |
61 | 58 |
62 } // namespace | 59 } // namespace |
63 | 60 |
64 // ---------------------------------------------------------------------------- | 61 // ---------------------------------------------------------------------------- |
65 | 62 |
66 Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received) | |
67 : response_received(in_response_received) {} | |
68 | |
69 Router::SyncResponseInfo::~SyncResponseInfo() {} | |
70 | |
71 // ---------------------------------------------------------------------------- | |
72 | |
73 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) | 63 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) |
74 : router_(router) { | 64 : router_(router) { |
75 } | 65 } |
76 | 66 |
77 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() { | 67 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() { |
78 } | 68 } |
79 | 69 |
80 bool Router::HandleIncomingMessageThunk::Accept(Message* message) { | 70 bool Router::HandleIncomingMessageThunk::Accept(Message* message) { |
81 return router_->HandleIncomingMessage(message); | 71 return router_->HandleIncomingMessage(message); |
82 } | 72 } |
83 | 73 |
84 // ---------------------------------------------------------------------------- | 74 // ---------------------------------------------------------------------------- |
85 | 75 |
86 Router::Router(ScopedMessagePipeHandle message_pipe, | 76 Router::Router(ScopedMessagePipeHandle message_pipe, |
87 FilterChain filters, | 77 FilterChain filters, |
88 bool expects_sync_requests, | |
89 const MojoAsyncWaiter* waiter) | 78 const MojoAsyncWaiter* waiter) |
90 : thunk_(this), | 79 : thunk_(this), |
91 filters_(std::move(filters)), | 80 filters_(std::move(filters)), |
92 connector_(std::move(message_pipe), | 81 connector_(std::move(message_pipe), |
93 Connector::SINGLE_THREADED_SEND, | 82 Connector::SINGLE_THREADED_SEND, |
94 waiter), | 83 waiter), |
95 incoming_receiver_(nullptr), | 84 incoming_receiver_(nullptr), |
96 next_request_id_(0), | 85 next_request_id_(0), |
97 testing_mode_(false), | 86 testing_mode_(false), |
98 pending_task_for_messages_(false), | |
99 weak_factory_(this) { | 87 weak_factory_(this) { |
100 filters_.SetSink(&thunk_); | 88 filters_.SetSink(&thunk_); |
101 if (expects_sync_requests) | |
102 connector_.RegisterSyncHandleWatch(); | |
103 connector_.set_incoming_receiver(filters_.GetHead()); | 89 connector_.set_incoming_receiver(filters_.GetHead()); |
104 } | 90 } |
105 | 91 |
106 Router::~Router() {} | 92 Router::~Router() { |
| 93 weak_factory_.InvalidateWeakPtrs(); |
| 94 |
| 95 for (auto& pair : async_responders_) |
| 96 delete pair.second; |
| 97 for (auto& pair : sync_responders_) |
| 98 delete pair.second; |
| 99 } |
107 | 100 |
108 bool Router::Accept(Message* message) { | 101 bool Router::Accept(Message* message) { |
109 DCHECK(thread_checker_.CalledOnValidThread()); | 102 DCHECK(thread_checker_.CalledOnValidThread()); |
110 DCHECK(!message->has_flag(kMessageExpectsResponse)); | 103 DCHECK(!message->has_flag(kMessageExpectsResponse)); |
111 return connector_.Accept(message); | 104 return connector_.Accept(message); |
112 } | 105 } |
113 | 106 |
114 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { | 107 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { |
115 DCHECK(thread_checker_.CalledOnValidThread()); | 108 DCHECK(thread_checker_.CalledOnValidThread()); |
116 DCHECK(message->has_flag(kMessageExpectsResponse)); | 109 DCHECK(message->has_flag(kMessageExpectsResponse)); |
117 | 110 |
118 // Reserve 0 in case we want it to convey special meaning in the future. | 111 // Reserve 0 in case we want it to convey special meaning in the future. |
119 uint64_t request_id = next_request_id_++; | 112 uint64_t request_id = next_request_id_++; |
120 if (request_id == 0) | 113 if (request_id == 0) |
121 request_id = next_request_id_++; | 114 request_id = next_request_id_++; |
122 | 115 |
123 message->set_request_id(request_id); | 116 message->set_request_id(request_id); |
124 if (!connector_.Accept(message)) | 117 if (!connector_.Accept(message)) |
125 return false; | 118 return false; |
126 | 119 |
127 if (!message->has_flag(kMessageIsSync)) { | 120 if (!message->has_flag(kMessageIsSync)) { |
128 // We assume ownership of |responder|. | 121 // We assume ownership of |responder|. |
129 async_responders_[request_id] = make_scoped_ptr(responder); | 122 async_responders_[request_id] = responder; |
130 return true; | 123 return true; |
131 } | 124 } |
132 | 125 |
133 if (!connector_.RegisterSyncHandleWatch()) | 126 sync_responders_[request_id] = responder; |
134 return false; | |
135 | |
136 bool response_received = false; | |
137 scoped_ptr<MessageReceiver> sync_responder(responder); | |
138 sync_responses_.insert(std::make_pair( | |
139 request_id, make_scoped_ptr(new SyncResponseInfo(&response_received)))); | |
140 | |
141 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); | 127 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); |
142 do { | 128 for (;;) { |
143 bool result = connector_.RunSyncHandleWatch(&response_received); | 129 // TODO(yzshen): Here we should allow incoming sync requests to re-enter and |
| 130 // block async messages. |
| 131 bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); |
| 132 // The message pipe has disconnected. |
144 if (!result) | 133 if (!result) |
145 break; | 134 break; |
146 | 135 |
147 // This instance has been destroyed. | 136 // This instance has been destroyed. |
148 if (!weak_self) | 137 if (!weak_self) |
149 break; | 138 break; |
150 | 139 |
151 // The corresponding response message has arrived. | 140 // The corresponding response message has arrived. |
152 DCHECK(response_received); | 141 if (sync_responders_.find(request_id) == sync_responders_.end()) |
153 DCHECK(ContainsKey(sync_responses_, request_id)); | 142 break; |
154 auto iter = sync_responses_.find(request_id); | 143 } |
155 DCHECK_EQ(&response_received, iter->second->response_received); | |
156 scoped_ptr<Message> response = std::move(iter->second->response); | |
157 sync_responses_.erase(iter); | |
158 ignore_result(sync_responder->Accept(response.get())); | |
159 } while (false); | |
160 | |
161 if (weak_self) | |
162 connector_.UnregisterSyncHandleWatch(); | |
163 | 144 |
164 // Return true means that we take ownership of |responder|. | 145 // Return true means that we take ownership of |responder|. |
165 return true; | 146 return true; |
166 } | 147 } |
167 | 148 |
168 void Router::EnableTestingMode() { | 149 void Router::EnableTestingMode() { |
169 DCHECK(thread_checker_.CalledOnValidThread()); | 150 DCHECK(thread_checker_.CalledOnValidThread()); |
170 testing_mode_ = true; | 151 testing_mode_ = true; |
171 connector_.set_enforce_errors_from_incoming_receiver(false); | 152 connector_.set_enforce_errors_from_incoming_receiver(false); |
172 } | 153 } |
173 | 154 |
174 bool Router::HandleIncomingMessage(Message* message) { | 155 bool Router::HandleIncomingMessage(Message* message) { |
175 DCHECK(thread_checker_.CalledOnValidThread()); | 156 DCHECK(thread_checker_.CalledOnValidThread()); |
176 | |
177 const bool during_sync_call = | |
178 connector_.during_sync_handle_watcher_callback(); | |
179 if (!message->has_flag(kMessageIsSync) && | |
180 (during_sync_call || !pending_messages_.empty())) { | |
181 scoped_ptr<Message> pending_message(new Message); | |
182 message->MoveTo(pending_message.get()); | |
183 pending_messages_.push(std::move(pending_message)); | |
184 | |
185 if (!pending_task_for_messages_) { | |
186 pending_task_for_messages_ = true; | |
187 base::MessageLoop::current()->PostTask( | |
188 FROM_HERE, base::Bind(&Router::HandleQueuedMessages, | |
189 weak_factory_.GetWeakPtr())); | |
190 } | |
191 | |
192 return true; | |
193 } | |
194 | |
195 return HandleMessageInternal(message); | |
196 } | |
197 | |
198 void Router::HandleQueuedMessages() { | |
199 DCHECK(thread_checker_.CalledOnValidThread()); | |
200 DCHECK(pending_task_for_messages_); | |
201 | |
202 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); | |
203 while (!pending_messages_.empty()) { | |
204 scoped_ptr<Message> message(std::move(pending_messages_.front())); | |
205 pending_messages_.pop(); | |
206 | |
207 bool result = HandleMessageInternal(message.get()); | |
208 if (!weak_self) | |
209 return; | |
210 | |
211 if (!result && !testing_mode_) { | |
212 connector_.RaiseError(); | |
213 break; | |
214 } | |
215 } | |
216 | |
217 pending_task_for_messages_ = false; | |
218 } | |
219 | |
220 bool Router::HandleMessageInternal(Message* message) { | |
221 if (message->has_flag(kMessageExpectsResponse)) { | 157 if (message->has_flag(kMessageExpectsResponse)) { |
222 if (!incoming_receiver_) | 158 if (!incoming_receiver_) |
223 return false; | 159 return false; |
224 | 160 |
225 MessageReceiverWithStatus* responder = | 161 MessageReceiverWithStatus* responder = |
226 new ResponderThunk(weak_factory_.GetWeakPtr()); | 162 new ResponderThunk(weak_factory_.GetWeakPtr()); |
227 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); | 163 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); |
228 if (!ok) | 164 if (!ok) |
229 delete responder; | 165 delete responder; |
230 return ok; | 166 return ok; |
231 | 167 |
232 } else if (message->has_flag(kMessageIsResponse)) { | 168 } else if (message->has_flag(kMessageIsResponse)) { |
| 169 ResponderMap& responder_map = message->has_flag(kMessageIsSync) |
| 170 ? sync_responders_ |
| 171 : async_responders_; |
233 uint64_t request_id = message->request_id(); | 172 uint64_t request_id = message->request_id(); |
234 | 173 ResponderMap::iterator it = responder_map.find(request_id); |
235 if (message->has_flag(kMessageIsSync)) { | 174 if (it == responder_map.end()) { |
236 auto it = sync_responses_.find(request_id); | |
237 if (it == sync_responses_.end()) { | |
238 DCHECK(testing_mode_); | |
239 return false; | |
240 } | |
241 it->second->response.reset(new Message()); | |
242 message->MoveTo(it->second->response.get()); | |
243 *it->second->response_received = true; | |
244 return true; | |
245 } | |
246 | |
247 auto it = async_responders_.find(request_id); | |
248 if (it == async_responders_.end()) { | |
249 DCHECK(testing_mode_); | 175 DCHECK(testing_mode_); |
250 return false; | 176 return false; |
251 } | 177 } |
252 scoped_ptr<MessageReceiver> responder = std::move(it->second); | 178 MessageReceiver* responder = it->second; |
253 async_responders_.erase(it); | 179 responder_map.erase(it); |
254 return responder->Accept(message); | 180 bool ok = responder->Accept(message); |
| 181 delete responder; |
| 182 return ok; |
255 } else { | 183 } else { |
256 if (!incoming_receiver_) | 184 if (!incoming_receiver_) |
257 return false; | 185 return false; |
258 | 186 |
259 return incoming_receiver_->Accept(message); | 187 return incoming_receiver_->Accept(message); |
260 } | 188 } |
261 } | 189 } |
262 | 190 |
263 // ---------------------------------------------------------------------------- | 191 // ---------------------------------------------------------------------------- |
264 | 192 |
265 } // namespace internal | 193 } // namespace internal |
266 } // namespace mojo | 194 } // namespace mojo |
OLD | NEW |