OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/system/message_pipe.h" | 5 #include "mojo/system/message_pipe.h" |
6 | 6 |
7 #include "base/logging.h" | 7 #include "base/logging.h" |
8 #include "mojo/system/channel_endpoint.h" | 8 #include "mojo/system/channel_endpoint.h" |
9 #include "mojo/system/local_message_pipe_endpoint.h" | 9 #include "mojo/system/local_message_pipe_endpoint.h" |
10 #include "mojo/system/message_in_transit.h" | 10 #include "mojo/system/message_in_transit.h" |
11 #include "mojo/system/message_pipe_dispatcher.h" | 11 #include "mojo/system/message_pipe_dispatcher.h" |
12 #include "mojo/system/message_pipe_endpoint.h" | 12 #include "mojo/system/message_pipe_endpoint.h" |
13 #include "mojo/system/proxy_message_pipe_endpoint.h" | 13 #include "mojo/system/proxy_message_pipe_endpoint.h" |
14 | 14 |
15 namespace mojo { | 15 namespace mojo { |
16 namespace system { | 16 namespace system { |
17 | 17 |
18 MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0, | 18 // static |
19 scoped_ptr<MessagePipeEndpoint> endpoint1) { | 19 MessagePipe* MessagePipe::CreateLocalLocal() { |
20 endpoints_[0].reset(endpoint0.release()); | 20 MessagePipe* message_pipe = new MessagePipe(); |
21 endpoints_[1].reset(endpoint1.release()); | 21 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
| 22 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
| 23 return message_pipe; |
22 } | 24 } |
23 | 25 |
24 // static | 26 // static |
25 MessagePipe* MessagePipe::CreateLocalLocal() { | 27 MessagePipe* MessagePipe::CreateLocalProxy( |
26 return new MessagePipe( | 28 scoped_refptr<ChannelEndpoint>* channel_endpoint) { |
27 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint), | 29 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. |
28 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint)); | 30 MessagePipe* message_pipe = new MessagePipe(); |
| 31 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
| 32 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); |
| 33 message_pipe->endpoints_[1].reset( |
| 34 new ProxyMessagePipeEndpoint(channel_endpoint->get())); |
| 35 return message_pipe; |
29 } | 36 } |
30 | 37 |
31 // static | 38 // static |
32 MessagePipe* MessagePipe::CreateLocalProxy() { | 39 MessagePipe* MessagePipe::CreateProxyLocal( |
33 return new MessagePipe( | 40 scoped_refptr<ChannelEndpoint>* channel_endpoint) { |
34 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint), | 41 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. |
35 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint)); | 42 MessagePipe* message_pipe = new MessagePipe(); |
| 43 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); |
| 44 message_pipe->endpoints_[0].reset( |
| 45 new ProxyMessagePipeEndpoint(channel_endpoint->get())); |
| 46 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
| 47 return message_pipe; |
36 } | 48 } |
37 | 49 |
38 // static | 50 // static |
39 MessagePipe* MessagePipe::CreateProxyLocal() { | |
40 return new MessagePipe( | |
41 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint), | |
42 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint)); | |
43 } | |
44 | |
45 // static | |
46 unsigned MessagePipe::GetPeerPort(unsigned port) { | 51 unsigned MessagePipe::GetPeerPort(unsigned port) { |
47 DCHECK(port == 0 || port == 1); | 52 DCHECK(port == 0 || port == 1); |
48 return port ^ 1; | 53 return port ^ 1; |
49 } | 54 } |
50 | 55 |
51 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { | 56 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { |
52 DCHECK(port == 0 || port == 1); | 57 DCHECK(port == 0 || port == 1); |
53 base::AutoLock locker(lock_); | 58 base::AutoLock locker(lock_); |
54 DCHECK(endpoints_[port]); | 59 DCHECK(endpoints_[port]); |
55 | 60 |
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
158 // TODO(vtl): Allowing this case is a temporary hack. It'll set up a | 163 // TODO(vtl): Allowing this case is a temporary hack. It'll set up a |
159 // |MessagePipe| with two proxy endpoints, which will then act as a proxy | 164 // |MessagePipe| with two proxy endpoints, which will then act as a proxy |
160 // (rather than trying to connect the two ends directly). | 165 // (rather than trying to connect the two ends directly). |
161 DLOG_IF(WARNING, | 166 DLOG_IF(WARNING, |
162 is_peer_open && | 167 is_peer_open && |
163 endpoints_[GetPeerPort(port)]->GetType() != | 168 endpoints_[GetPeerPort(port)]->GetType() != |
164 MessagePipeEndpoint::kTypeLocal) | 169 MessagePipeEndpoint::kTypeLocal) |
165 << "Direct message pipe passing across multiple channels not yet " | 170 << "Direct message pipe passing across multiple channels not yet " |
166 "implemented; will proxy"; | 171 "implemented; will proxy"; |
167 | 172 |
| 173 scoped_refptr<ChannelEndpoint> channel_endpoint( |
| 174 new ChannelEndpoint(this, port)); |
168 scoped_ptr<MessagePipeEndpoint> replacement_endpoint( | 175 scoped_ptr<MessagePipeEndpoint> replacement_endpoint( |
169 new ProxyMessagePipeEndpoint( | 176 new ProxyMessagePipeEndpoint( |
| 177 channel_endpoint.get(), |
170 static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()), | 178 static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()), |
171 is_peer_open)); | 179 is_peer_open)); |
172 endpoints_[port].swap(replacement_endpoint); | 180 endpoints_[port].swap(replacement_endpoint); |
173 | 181 |
174 return make_scoped_refptr(new ChannelEndpoint(this, port)); | 182 return channel_endpoint; |
175 } | 183 } |
176 | 184 |
177 MojoResult MessagePipe::EnqueueMessage(unsigned port, | 185 MojoResult MessagePipe::EnqueueMessage(unsigned port, |
178 scoped_ptr<MessageInTransit> message) { | 186 scoped_ptr<MessageInTransit> message) { |
179 return EnqueueMessageInternal(port, message.Pass(), NULL); | 187 return EnqueueMessageInternal(port, message.Pass(), NULL); |
180 } | 188 } |
181 | 189 |
182 bool MessagePipe::Attach(unsigned port, ChannelEndpoint* channel_endpoint) { | 190 bool MessagePipe::Attach(unsigned port, ChannelEndpoint* channel_endpoint) { |
183 DCHECK(port == 0 || port == 1); | 191 DCHECK(port == 0 || port == 1); |
184 DCHECK(channel_endpoint); | 192 DCHECK(channel_endpoint); |
185 | 193 |
186 base::AutoLock locker(lock_); | 194 base::AutoLock locker(lock_); |
187 if (!endpoints_[port]) | 195 if (!endpoints_[port]) |
188 return false; | 196 return false; |
189 | 197 |
190 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy); | 198 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy); |
191 endpoints_[port]->Attach(channel_endpoint); | |
192 return true; | 199 return true; |
193 } | 200 } |
194 | 201 |
195 void MessagePipe::Run(unsigned port) { | 202 void MessagePipe::Run(unsigned port) { |
196 DCHECK(port == 0 || port == 1); | 203 DCHECK(port == 0 || port == 1); |
197 | 204 |
198 base::AutoLock locker(lock_); | 205 base::AutoLock locker(lock_); |
199 DCHECK(endpoints_[port]); | 206 DCHECK(endpoints_[port]); |
200 if (!endpoints_[port]->Run()) | 207 if (!endpoints_[port]->Run()) |
201 endpoints_[port].reset(); | 208 endpoints_[port].reset(); |
202 } | 209 } |
203 | 210 |
204 void MessagePipe::OnRemove(unsigned port) { | 211 void MessagePipe::OnRemove(unsigned port) { |
205 unsigned destination_port = GetPeerPort(port); | 212 unsigned destination_port = GetPeerPort(port); |
206 | 213 |
207 base::AutoLock locker(lock_); | 214 base::AutoLock locker(lock_); |
208 // A |OnPeerClose()| can come in first, before |OnRemove()| gets called. | 215 // A |OnPeerClose()| can come in first, before |OnRemove()| gets called. |
209 if (!endpoints_[port]) | 216 if (!endpoints_[port]) |
210 return; | 217 return; |
211 | 218 |
212 endpoints_[port]->OnRemove(); | 219 endpoints_[port]->OnRemove(); |
213 if (endpoints_[destination_port]) { | 220 if (endpoints_[destination_port]) { |
214 if (!endpoints_[destination_port]->OnPeerClose()) | 221 if (!endpoints_[destination_port]->OnPeerClose()) |
215 endpoints_[destination_port].reset(); | 222 endpoints_[destination_port].reset(); |
216 } | 223 } |
217 endpoints_[port].reset(); | 224 endpoints_[port].reset(); |
218 } | 225 } |
219 | 226 |
| 227 MessagePipe::MessagePipe() { |
| 228 } |
| 229 |
220 MessagePipe::~MessagePipe() { | 230 MessagePipe::~MessagePipe() { |
221 // Owned by the dispatchers. The owning dispatchers should only release us via | 231 // Owned by the dispatchers. The owning dispatchers should only release us via |
222 // their |Close()| method, which should inform us of being closed via our | 232 // their |Close()| method, which should inform us of being closed via our |
223 // |Close()|. Thus these should already be null. | 233 // |Close()|. Thus these should already be null. |
224 DCHECK(!endpoints_[0]); | 234 DCHECK(!endpoints_[0]); |
225 DCHECK(!endpoints_[1]); | 235 DCHECK(!endpoints_[1]); |
226 } | 236 } |
227 | 237 |
228 MojoResult MessagePipe::EnqueueMessageInternal( | 238 MojoResult MessagePipe::EnqueueMessageInternal( |
229 unsigned port, | 239 unsigned port, |
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
304 MojoResult MessagePipe::HandleControlMessage( | 314 MojoResult MessagePipe::HandleControlMessage( |
305 unsigned /*port*/, | 315 unsigned /*port*/, |
306 scoped_ptr<MessageInTransit> message) { | 316 scoped_ptr<MessageInTransit> message) { |
307 LOG(WARNING) << "Unrecognized MessagePipe control message subtype " | 317 LOG(WARNING) << "Unrecognized MessagePipe control message subtype " |
308 << message->subtype(); | 318 << message->subtype(); |
309 return MOJO_RESULT_UNKNOWN; | 319 return MOJO_RESULT_UNKNOWN; |
310 } | 320 } |
311 | 321 |
312 } // namespace system | 322 } // namespace system |
313 } // namespace mojo | 323 } // namespace mojo |
OLD | NEW |