Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(74)

Side by Side Diff: mojo/public/cpp/bindings/lib/router.cc

Issue 1723673002: Reland "Mojo C++ bindings: support sync methods - part 2" (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/router.h" 5 #include "mojo/public/cpp/bindings/lib/router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/bind.h"
10 #include "base/logging.h" 11 #include "base/logging.h"
12 #include "base/message_loop/message_loop.h"
13 #include "base/stl_util.h"
11 14
12 namespace mojo { 15 namespace mojo {
13 namespace internal { 16 namespace internal {
14 17
15 // ---------------------------------------------------------------------------- 18 // ----------------------------------------------------------------------------
16 19
17 namespace { 20 namespace {
18 21
19 class ResponderThunk : public MessageReceiverWithStatus { 22 class ResponderThunk : public MessageReceiverWithStatus {
20 public: 23 public:
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
53 56
54 private: 57 private:
55 base::WeakPtr<Router> router_; 58 base::WeakPtr<Router> router_;
56 bool accept_was_invoked_; 59 bool accept_was_invoked_;
57 }; 60 };
58 61
59 } // namespace 62 } // namespace
60 63
61 // ---------------------------------------------------------------------------- 64 // ----------------------------------------------------------------------------
62 65
66 Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received)
67 : response_received(in_response_received) {}
68
69 Router::SyncResponseInfo::~SyncResponseInfo() {}
70
71 // ----------------------------------------------------------------------------
72
63 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) 73 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router)
64 : router_(router) { 74 : router_(router) {
65 } 75 }
66 76
67 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() { 77 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() {
68 } 78 }
69 79
70 bool Router::HandleIncomingMessageThunk::Accept(Message* message) { 80 bool Router::HandleIncomingMessageThunk::Accept(Message* message) {
71 return router_->HandleIncomingMessage(message); 81 return router_->HandleIncomingMessage(message);
72 } 82 }
73 83
74 // ---------------------------------------------------------------------------- 84 // ----------------------------------------------------------------------------
75 85
76 Router::Router(ScopedMessagePipeHandle message_pipe, 86 Router::Router(ScopedMessagePipeHandle message_pipe,
77 FilterChain filters, 87 FilterChain filters,
88 bool expects_sync_requests,
78 const MojoAsyncWaiter* waiter) 89 const MojoAsyncWaiter* waiter)
79 : thunk_(this), 90 : thunk_(this),
80 filters_(std::move(filters)), 91 filters_(std::move(filters)),
81 connector_(std::move(message_pipe), 92 connector_(std::move(message_pipe),
82 Connector::SINGLE_THREADED_SEND, 93 Connector::SINGLE_THREADED_SEND,
83 waiter), 94 waiter),
84 incoming_receiver_(nullptr), 95 incoming_receiver_(nullptr),
85 next_request_id_(0), 96 next_request_id_(0),
86 testing_mode_(false), 97 testing_mode_(false),
98 pending_task_for_messages_(false),
87 weak_factory_(this) { 99 weak_factory_(this) {
88 filters_.SetSink(&thunk_); 100 filters_.SetSink(&thunk_);
101 if (expects_sync_requests)
102 connector_.RegisterSyncHandleWatch();
89 connector_.set_incoming_receiver(filters_.GetHead()); 103 connector_.set_incoming_receiver(filters_.GetHead());
90 } 104 }
91 105
92 Router::~Router() { 106 Router::~Router() {}
93 weak_factory_.InvalidateWeakPtrs();
94
95 for (auto& pair : async_responders_)
96 delete pair.second;
97 for (auto& pair : sync_responders_)
98 delete pair.second;
99 }
100 107
101 bool Router::Accept(Message* message) { 108 bool Router::Accept(Message* message) {
102 DCHECK(thread_checker_.CalledOnValidThread()); 109 DCHECK(thread_checker_.CalledOnValidThread());
103 DCHECK(!message->has_flag(kMessageExpectsResponse)); 110 DCHECK(!message->has_flag(kMessageExpectsResponse));
104 return connector_.Accept(message); 111 return connector_.Accept(message);
105 } 112 }
106 113
107 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { 114 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) {
108 DCHECK(thread_checker_.CalledOnValidThread()); 115 DCHECK(thread_checker_.CalledOnValidThread());
109 DCHECK(message->has_flag(kMessageExpectsResponse)); 116 DCHECK(message->has_flag(kMessageExpectsResponse));
110 117
111 // Reserve 0 in case we want it to convey special meaning in the future. 118 // Reserve 0 in case we want it to convey special meaning in the future.
112 uint64_t request_id = next_request_id_++; 119 uint64_t request_id = next_request_id_++;
113 if (request_id == 0) 120 if (request_id == 0)
114 request_id = next_request_id_++; 121 request_id = next_request_id_++;
115 122
116 message->set_request_id(request_id); 123 message->set_request_id(request_id);
117 if (!connector_.Accept(message)) 124 if (!connector_.Accept(message))
118 return false; 125 return false;
119 126
120 if (!message->has_flag(kMessageIsSync)) { 127 if (!message->has_flag(kMessageIsSync)) {
121 // We assume ownership of |responder|. 128 // We assume ownership of |responder|.
122 async_responders_[request_id] = responder; 129 async_responders_[request_id] = make_scoped_ptr(responder);
123 return true; 130 return true;
124 } 131 }
125 132
126 sync_responders_[request_id] = responder; 133 if (!connector_.RegisterSyncHandleWatch())
134 return false;
135
136 bool response_received = false;
137 scoped_ptr<MessageReceiver> sync_responder(responder);
138 sync_responses_.insert(std::make_pair(
139 request_id, make_scoped_ptr(new SyncResponseInfo(&response_received))));
140
127 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); 141 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
128 for (;;) { 142 bool result = connector_.RunSyncHandleWatch(&response_received);
129 // TODO(yzshen): Here we should allow incoming sync requests to re-enter and 143 // Make sure that this instance hasn't been destroyed.
130 // block async messages. 144 if (weak_self) {
131 bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); 145 DCHECK(ContainsKey(sync_responses_, request_id));
132 // The message pipe has disconnected. 146 auto iter = sync_responses_.find(request_id);
133 if (!result) 147 DCHECK_EQ(&response_received, iter->second->response_received);
134 break; 148 if (result && response_received) {
149 scoped_ptr<Message> response = std::move(iter->second->response);
150 ignore_result(sync_responder->Accept(response.get()));
151 }
152 sync_responses_.erase(iter);
135 153
136 // This instance has been destroyed. 154 connector_.UnregisterSyncHandleWatch();
137 if (!weak_self)
138 break;
139
140 // The corresponding response message has arrived.
141 if (sync_responders_.find(request_id) == sync_responders_.end())
142 break;
143 } 155 }
144 156
145 // Return true means that we take ownership of |responder|. 157 // Return true means that we take ownership of |responder|.
146 return true; 158 return true;
147 } 159 }
148 160
149 void Router::EnableTestingMode() { 161 void Router::EnableTestingMode() {
150 DCHECK(thread_checker_.CalledOnValidThread()); 162 DCHECK(thread_checker_.CalledOnValidThread());
151 testing_mode_ = true; 163 testing_mode_ = true;
152 connector_.set_enforce_errors_from_incoming_receiver(false); 164 connector_.set_enforce_errors_from_incoming_receiver(false);
153 } 165 }
154 166
155 bool Router::HandleIncomingMessage(Message* message) { 167 bool Router::HandleIncomingMessage(Message* message) {
156 DCHECK(thread_checker_.CalledOnValidThread()); 168 DCHECK(thread_checker_.CalledOnValidThread());
169
170 const bool during_sync_call =
171 connector_.during_sync_handle_watcher_callback();
172 if (!message->has_flag(kMessageIsSync) &&
173 (during_sync_call || !pending_messages_.empty())) {
174 scoped_ptr<Message> pending_message(new Message);
175 message->MoveTo(pending_message.get());
176 pending_messages_.push(std::move(pending_message));
177
178 if (!pending_task_for_messages_) {
179 pending_task_for_messages_ = true;
180 base::MessageLoop::current()->PostTask(
181 FROM_HERE, base::Bind(&Router::HandleQueuedMessages,
182 weak_factory_.GetWeakPtr()));
183 }
184
185 return true;
186 }
187
188 return HandleMessageInternal(message);
189 }
190
191 void Router::HandleQueuedMessages() {
192 DCHECK(thread_checker_.CalledOnValidThread());
193 DCHECK(pending_task_for_messages_);
194
195 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
196 while (!pending_messages_.empty()) {
197 scoped_ptr<Message> message(std::move(pending_messages_.front()));
198 pending_messages_.pop();
199
200 bool result = HandleMessageInternal(message.get());
201 if (!weak_self)
202 return;
203
204 if (!result && !testing_mode_) {
205 connector_.RaiseError();
206 break;
207 }
208 }
209
210 pending_task_for_messages_ = false;
211 }
212
213 bool Router::HandleMessageInternal(Message* message) {
157 if (message->has_flag(kMessageExpectsResponse)) { 214 if (message->has_flag(kMessageExpectsResponse)) {
158 if (!incoming_receiver_) 215 if (!incoming_receiver_)
159 return false; 216 return false;
160 217
161 MessageReceiverWithStatus* responder = 218 MessageReceiverWithStatus* responder =
162 new ResponderThunk(weak_factory_.GetWeakPtr()); 219 new ResponderThunk(weak_factory_.GetWeakPtr());
163 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); 220 bool ok = incoming_receiver_->AcceptWithResponder(message, responder);
164 if (!ok) 221 if (!ok)
165 delete responder; 222 delete responder;
166 return ok; 223 return ok;
167 224
168 } else if (message->has_flag(kMessageIsResponse)) { 225 } else if (message->has_flag(kMessageIsResponse)) {
169 ResponderMap& responder_map = message->has_flag(kMessageIsSync)
170 ? sync_responders_
171 : async_responders_;
172 uint64_t request_id = message->request_id(); 226 uint64_t request_id = message->request_id();
173 ResponderMap::iterator it = responder_map.find(request_id); 227
174 if (it == responder_map.end()) { 228 if (message->has_flag(kMessageIsSync)) {
229 auto it = sync_responses_.find(request_id);
230 if (it == sync_responses_.end()) {
231 DCHECK(testing_mode_);
232 return false;
233 }
234 it->second->response.reset(new Message());
235 message->MoveTo(it->second->response.get());
236 *it->second->response_received = true;
237 return true;
238 }
239
240 auto it = async_responders_.find(request_id);
241 if (it == async_responders_.end()) {
175 DCHECK(testing_mode_); 242 DCHECK(testing_mode_);
176 return false; 243 return false;
177 } 244 }
178 MessageReceiver* responder = it->second; 245 scoped_ptr<MessageReceiver> responder = std::move(it->second);
179 responder_map.erase(it); 246 async_responders_.erase(it);
180 bool ok = responder->Accept(message); 247 return responder->Accept(message);
181 delete responder;
182 return ok;
183 } else { 248 } else {
184 if (!incoming_receiver_) 249 if (!incoming_receiver_)
185 return false; 250 return false;
186 251
187 return incoming_receiver_->Accept(message); 252 return incoming_receiver_->Accept(message);
188 } 253 }
189 } 254 }
190 255
191 // ---------------------------------------------------------------------------- 256 // ----------------------------------------------------------------------------
192 257
193 } // namespace internal 258 } // namespace internal
194 } // namespace mojo 259 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/router.h ('k') | mojo/public/cpp/bindings/lib/sync_handle_watcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698