OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/lib/router.h" | 5 #include "mojo/public/cpp/bindings/lib/router.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 #include <utility> | 8 #include <utility> |
9 | 9 |
10 #include "base/logging.h" | 10 #include "base/logging.h" |
11 | 11 |
12 namespace mojo { | 12 namespace mojo { |
13 namespace internal { | 13 namespace internal { |
14 | 14 |
15 // ---------------------------------------------------------------------------- | 15 // ---------------------------------------------------------------------------- |
16 | 16 |
17 namespace { | 17 namespace { |
18 | 18 |
19 class ResponderThunk : public MessageReceiverWithStatus { | 19 class ResponderThunk : public MessageReceiverWithStatus { |
20 public: | 20 public: |
21 explicit ResponderThunk(const SharedData<Router*>& router) | 21 explicit ResponderThunk(const base::WeakPtr<Router>& router) |
22 : router_(router), accept_was_invoked_(false) {} | 22 : router_(router), accept_was_invoked_(false) {} |
23 ~ResponderThunk() override { | 23 ~ResponderThunk() override { |
24 if (!accept_was_invoked_) { | 24 if (!accept_was_invoked_) { |
25 // The Mojo application handled a message that was expecting a response | 25 // The Mojo application handled a message that was expecting a response |
26 // but did not send a response. | 26 // but did not send a response. |
27 Router* router = router_.value(); | 27 if (router_) { |
28 if (router) { | |
29 // We raise an error to signal the calling application that an error | 28 // We raise an error to signal the calling application that an error |
30 // condition occurred. Without this the calling application would have | 29 // condition occurred. Without this the calling application would have |
31 // no way of knowing it should stop waiting for a response. | 30 // no way of knowing it should stop waiting for a response. |
32 router->RaiseError(); | 31 router_->RaiseError(); |
33 } | 32 } |
34 } | 33 } |
35 } | 34 } |
36 | 35 |
37 // MessageReceiver implementation: | 36 // MessageReceiver implementation: |
38 bool Accept(Message* message) override { | 37 bool Accept(Message* message) override { |
39 accept_was_invoked_ = true; | 38 accept_was_invoked_ = true; |
40 DCHECK(message->has_flag(kMessageIsResponse)); | 39 DCHECK(message->has_flag(kMessageIsResponse)); |
41 | 40 |
42 bool result = false; | 41 bool result = false; |
43 | 42 |
44 Router* router = router_.value(); | 43 if (router_) |
45 if (router) | 44 result = router_->Accept(message); |
46 result = router->Accept(message); | |
47 | 45 |
48 return result; | 46 return result; |
49 } | 47 } |
50 | 48 |
51 // MessageReceiverWithStatus implementation: | 49 // MessageReceiverWithStatus implementation: |
52 bool IsValid() override { | 50 bool IsValid() override { |
53 Router* router = router_.value(); | 51 return router_ && !router_->encountered_error() && router_->is_valid(); |
54 return router && !router->encountered_error() && router->is_valid(); | |
55 } | 52 } |
56 | 53 |
57 private: | 54 private: |
58 SharedData<Router*> router_; | 55 base::WeakPtr<Router> router_; |
59 bool accept_was_invoked_; | 56 bool accept_was_invoked_; |
60 }; | 57 }; |
61 | 58 |
62 } // namespace | 59 } // namespace |
63 | 60 |
64 // ---------------------------------------------------------------------------- | 61 // ---------------------------------------------------------------------------- |
65 | 62 |
66 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) | 63 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) |
67 : router_(router) { | 64 : router_(router) { |
68 } | 65 } |
69 | 66 |
70 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() { | 67 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() { |
71 } | 68 } |
72 | 69 |
73 bool Router::HandleIncomingMessageThunk::Accept(Message* message) { | 70 bool Router::HandleIncomingMessageThunk::Accept(Message* message) { |
74 return router_->HandleIncomingMessage(message); | 71 return router_->HandleIncomingMessage(message); |
75 } | 72 } |
76 | 73 |
77 // ---------------------------------------------------------------------------- | 74 // ---------------------------------------------------------------------------- |
78 | 75 |
79 Router::Router(ScopedMessagePipeHandle message_pipe, | 76 Router::Router(ScopedMessagePipeHandle message_pipe, |
80 FilterChain filters, | 77 FilterChain filters, |
81 const MojoAsyncWaiter* waiter) | 78 const MojoAsyncWaiter* waiter) |
82 : thunk_(this), | 79 : thunk_(this), |
83 filters_(std::move(filters)), | 80 filters_(std::move(filters)), |
84 connector_(std::move(message_pipe), | 81 connector_(std::move(message_pipe), |
85 Connector::SINGLE_THREADED_SEND, | 82 Connector::SINGLE_THREADED_SEND, |
86 waiter), | 83 waiter), |
87 weak_self_(this), | |
88 incoming_receiver_(nullptr), | 84 incoming_receiver_(nullptr), |
89 next_request_id_(0), | 85 next_request_id_(0), |
90 testing_mode_(false) { | 86 testing_mode_(false), |
| 87 weak_factory_(this) { |
91 filters_.SetSink(&thunk_); | 88 filters_.SetSink(&thunk_); |
92 connector_.set_incoming_receiver(filters_.GetHead()); | 89 connector_.set_incoming_receiver(filters_.GetHead()); |
93 } | 90 } |
94 | 91 |
95 Router::~Router() { | 92 Router::~Router() { |
96 weak_self_.set_value(nullptr); | 93 weak_factory_.InvalidateWeakPtrs(); |
97 | 94 |
98 for (auto& pair : responders_) | 95 for (auto& pair : async_responders_) |
| 96 delete pair.second; |
| 97 for (auto& pair : sync_responders_) |
99 delete pair.second; | 98 delete pair.second; |
100 } | 99 } |
101 | 100 |
102 bool Router::Accept(Message* message) { | 101 bool Router::Accept(Message* message) { |
103 DCHECK(thread_checker_.CalledOnValidThread()); | 102 DCHECK(thread_checker_.CalledOnValidThread()); |
104 DCHECK(!message->has_flag(kMessageExpectsResponse)); | 103 DCHECK(!message->has_flag(kMessageExpectsResponse)); |
105 return connector_.Accept(message); | 104 return connector_.Accept(message); |
106 } | 105 } |
107 | 106 |
108 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { | 107 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { |
109 DCHECK(thread_checker_.CalledOnValidThread()); | 108 DCHECK(thread_checker_.CalledOnValidThread()); |
110 DCHECK(message->has_flag(kMessageExpectsResponse)); | 109 DCHECK(message->has_flag(kMessageExpectsResponse)); |
111 | 110 |
112 // Reserve 0 in case we want it to convey special meaning in the future. | 111 // Reserve 0 in case we want it to convey special meaning in the future. |
113 uint64_t request_id = next_request_id_++; | 112 uint64_t request_id = next_request_id_++; |
114 if (request_id == 0) | 113 if (request_id == 0) |
115 request_id = next_request_id_++; | 114 request_id = next_request_id_++; |
116 | 115 |
117 message->set_request_id(request_id); | 116 message->set_request_id(request_id); |
118 if (!connector_.Accept(message)) | 117 if (!connector_.Accept(message)) |
119 return false; | 118 return false; |
120 | 119 |
121 // We assume ownership of |responder|. | 120 if (!message->has_flag(kMessageIsSync)) { |
122 responders_[request_id] = responder; | 121 // We assume ownership of |responder|. |
| 122 async_responders_[request_id] = responder; |
| 123 return true; |
| 124 } |
| 125 |
| 126 sync_responders_[request_id] = responder; |
| 127 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); |
| 128 for (;;) { |
| 129 // TODO(yzshen): Here we should allow incoming sync requests to re-enter and |
| 130 // block async messages. |
| 131 bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); |
| 132 // The message pipe has disconnected. |
| 133 if (!result) |
| 134 break; |
| 135 |
| 136 // This instance has been destroyed. |
| 137 if (!weak_self) |
| 138 break; |
| 139 |
| 140 // The corresponding response message has arrived. |
| 141 if (sync_responders_.find(request_id) == sync_responders_.end()) |
| 142 break; |
| 143 } |
| 144 |
| 145 // Return true means that we take ownership of |responder|. |
123 return true; | 146 return true; |
124 } | 147 } |
125 | 148 |
126 void Router::EnableTestingMode() { | 149 void Router::EnableTestingMode() { |
127 DCHECK(thread_checker_.CalledOnValidThread()); | 150 DCHECK(thread_checker_.CalledOnValidThread()); |
128 testing_mode_ = true; | 151 testing_mode_ = true; |
129 connector_.set_enforce_errors_from_incoming_receiver(false); | 152 connector_.set_enforce_errors_from_incoming_receiver(false); |
130 } | 153 } |
131 | 154 |
132 bool Router::HandleIncomingMessage(Message* message) { | 155 bool Router::HandleIncomingMessage(Message* message) { |
133 DCHECK(thread_checker_.CalledOnValidThread()); | 156 DCHECK(thread_checker_.CalledOnValidThread()); |
134 if (message->has_flag(kMessageExpectsResponse)) { | 157 if (message->has_flag(kMessageExpectsResponse)) { |
135 if (!incoming_receiver_) | 158 if (!incoming_receiver_) |
136 return false; | 159 return false; |
137 | 160 |
138 MessageReceiverWithStatus* responder = new ResponderThunk(weak_self_); | 161 MessageReceiverWithStatus* responder = |
| 162 new ResponderThunk(weak_factory_.GetWeakPtr()); |
139 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); | 163 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); |
140 if (!ok) | 164 if (!ok) |
141 delete responder; | 165 delete responder; |
142 return ok; | 166 return ok; |
143 | 167 |
144 } else if (message->has_flag(kMessageIsResponse)) { | 168 } else if (message->has_flag(kMessageIsResponse)) { |
| 169 ResponderMap& responder_map = message->has_flag(kMessageIsSync) |
| 170 ? sync_responders_ |
| 171 : async_responders_; |
145 uint64_t request_id = message->request_id(); | 172 uint64_t request_id = message->request_id(); |
146 ResponderMap::iterator it = responders_.find(request_id); | 173 ResponderMap::iterator it = responder_map.find(request_id); |
147 if (it == responders_.end()) { | 174 if (it == responder_map.end()) { |
148 DCHECK(testing_mode_); | 175 DCHECK(testing_mode_); |
149 return false; | 176 return false; |
150 } | 177 } |
151 MessageReceiver* responder = it->second; | 178 MessageReceiver* responder = it->second; |
152 responders_.erase(it); | 179 responder_map.erase(it); |
153 bool ok = responder->Accept(message); | 180 bool ok = responder->Accept(message); |
154 delete responder; | 181 delete responder; |
155 return ok; | 182 return ok; |
156 } else { | 183 } else { |
157 if (!incoming_receiver_) | 184 if (!incoming_receiver_) |
158 return false; | 185 return false; |
159 | 186 |
160 return incoming_receiver_->Accept(message); | 187 return incoming_receiver_->Accept(message); |
161 } | 188 } |
162 } | 189 } |
163 | 190 |
164 // ---------------------------------------------------------------------------- | 191 // ---------------------------------------------------------------------------- |
165 | 192 |
166 } // namespace internal | 193 } // namespace internal |
167 } // namespace mojo | 194 } // namespace mojo |
OLD | NEW |