Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(114)

Side by Side Diff: mojo/system/message_pipe.cc

Issue 621153003: Move mojo edk into mojo/edk (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix checkdeps Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/system/message_pipe.h ('k') | mojo/system/message_pipe_dispatcher.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/message_pipe.h"
6
7 #include "base/logging.h"
8 #include "mojo/system/channel_endpoint.h"
9 #include "mojo/system/local_message_pipe_endpoint.h"
10 #include "mojo/system/message_in_transit.h"
11 #include "mojo/system/message_pipe_dispatcher.h"
12 #include "mojo/system/message_pipe_endpoint.h"
13 #include "mojo/system/proxy_message_pipe_endpoint.h"
14
15 namespace mojo {
16 namespace system {
17
18 // static
19 MessagePipe* MessagePipe::CreateLocalLocal() {
20 MessagePipe* message_pipe = new MessagePipe();
21 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
22 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
23 return message_pipe;
24 }
25
26 // static
27 MessagePipe* MessagePipe::CreateLocalProxy(
28 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
29 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
30 MessagePipe* message_pipe = new MessagePipe();
31 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
32 *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
33 message_pipe->endpoints_[1].reset(
34 new ProxyMessagePipeEndpoint(channel_endpoint->get()));
35 return message_pipe;
36 }
37
38 // static
39 MessagePipe* MessagePipe::CreateProxyLocal(
40 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
41 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
42 MessagePipe* message_pipe = new MessagePipe();
43 *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
44 message_pipe->endpoints_[0].reset(
45 new ProxyMessagePipeEndpoint(channel_endpoint->get()));
46 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
47 return message_pipe;
48 }
49
50 // static
51 unsigned MessagePipe::GetPeerPort(unsigned port) {
52 DCHECK(port == 0 || port == 1);
53 return port ^ 1;
54 }
55
56 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
57 DCHECK(port == 0 || port == 1);
58 base::AutoLock locker(lock_);
59 DCHECK(endpoints_[port]);
60
61 return endpoints_[port]->GetType();
62 }
63
64 void MessagePipe::CancelAllWaiters(unsigned port) {
65 DCHECK(port == 0 || port == 1);
66
67 base::AutoLock locker(lock_);
68 DCHECK(endpoints_[port]);
69 endpoints_[port]->CancelAllWaiters();
70 }
71
72 void MessagePipe::Close(unsigned port) {
73 DCHECK(port == 0 || port == 1);
74
75 unsigned destination_port = GetPeerPort(port);
76
77 base::AutoLock locker(lock_);
78 // The endpoint's |OnPeerClose()| may have been called first and returned
79 // false, which would have resulted in its destruction.
80 if (!endpoints_[port])
81 return;
82
83 endpoints_[port]->Close();
84 if (endpoints_[destination_port]) {
85 if (!endpoints_[destination_port]->OnPeerClose())
86 endpoints_[destination_port].reset();
87 }
88 endpoints_[port].reset();
89 }
90
91 // TODO(vtl): Handle flags.
92 MojoResult MessagePipe::WriteMessage(
93 unsigned port,
94 UserPointer<const void> bytes,
95 uint32_t num_bytes,
96 std::vector<DispatcherTransport>* transports,
97 MojoWriteMessageFlags flags) {
98 DCHECK(port == 0 || port == 1);
99 return EnqueueMessageInternal(
100 GetPeerPort(port),
101 make_scoped_ptr(new MessageInTransit(
102 MessageInTransit::kTypeMessagePipeEndpoint,
103 MessageInTransit::kSubtypeMessagePipeEndpointData,
104 num_bytes,
105 bytes)),
106 transports);
107 }
108
109 MojoResult MessagePipe::ReadMessage(unsigned port,
110 UserPointer<void> bytes,
111 UserPointer<uint32_t> num_bytes,
112 DispatcherVector* dispatchers,
113 uint32_t* num_dispatchers,
114 MojoReadMessageFlags flags) {
115 DCHECK(port == 0 || port == 1);
116
117 base::AutoLock locker(lock_);
118 DCHECK(endpoints_[port]);
119
120 return endpoints_[port]->ReadMessage(
121 bytes, num_bytes, dispatchers, num_dispatchers, flags);
122 }
123
124 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
125 DCHECK(port == 0 || port == 1);
126
127 base::AutoLock locker(const_cast<base::Lock&>(lock_));
128 DCHECK(endpoints_[port]);
129
130 return endpoints_[port]->GetHandleSignalsState();
131 }
132
133 MojoResult MessagePipe::AddWaiter(unsigned port,
134 Waiter* waiter,
135 MojoHandleSignals signals,
136 uint32_t context,
137 HandleSignalsState* signals_state) {
138 DCHECK(port == 0 || port == 1);
139
140 base::AutoLock locker(lock_);
141 DCHECK(endpoints_[port]);
142
143 return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state);
144 }
145
146 void MessagePipe::RemoveWaiter(unsigned port,
147 Waiter* waiter,
148 HandleSignalsState* signals_state) {
149 DCHECK(port == 0 || port == 1);
150
151 base::AutoLock locker(lock_);
152 DCHECK(endpoints_[port]);
153
154 endpoints_[port]->RemoveWaiter(waiter, signals_state);
155 }
156
157 scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
158 DCHECK(port == 0 || port == 1);
159
160 base::AutoLock locker(lock_);
161 DCHECK(endpoints_[port]);
162 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
163
164 // TODO(vtl): Allowing this case is a temporary hack. It'll set up a
165 // |MessagePipe| with two proxy endpoints, which will then act as a proxy
166 // (rather than trying to connect the two ends directly).
167 DLOG_IF(WARNING,
168 !!endpoints_[GetPeerPort(port)] &&
169 endpoints_[GetPeerPort(port)]->GetType() !=
170 MessagePipeEndpoint::kTypeLocal)
171 << "Direct message pipe passing across multiple channels not yet "
172 "implemented; will proxy";
173
174 scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass());
175 scoped_refptr<ChannelEndpoint> channel_endpoint(
176 new ChannelEndpoint(this, port));
177 endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get()));
178 channel_endpoint->TakeMessages(static_cast<LocalMessagePipeEndpoint*>(
179 old_endpoint.get())->message_queue());
180 old_endpoint->Close();
181
182 return channel_endpoint;
183 }
184
185 MojoResult MessagePipe::EnqueueMessage(unsigned port,
186 scoped_ptr<MessageInTransit> message) {
187 return EnqueueMessageInternal(port, message.Pass(), nullptr);
188 }
189
190 MessagePipe::MessagePipe() {
191 }
192
193 MessagePipe::~MessagePipe() {
194 // Owned by the dispatchers. The owning dispatchers should only release us via
195 // their |Close()| method, which should inform us of being closed via our
196 // |Close()|. Thus these should already be null.
197 DCHECK(!endpoints_[0]);
198 DCHECK(!endpoints_[1]);
199 }
200
201 MojoResult MessagePipe::EnqueueMessageInternal(
202 unsigned port,
203 scoped_ptr<MessageInTransit> message,
204 std::vector<DispatcherTransport>* transports) {
205 DCHECK(port == 0 || port == 1);
206 DCHECK(message);
207
208 if (message->type() == MessageInTransit::kTypeMessagePipe) {
209 DCHECK(!transports);
210 return HandleControlMessage(port, message.Pass());
211 }
212
213 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
214
215 base::AutoLock locker(lock_);
216 DCHECK(endpoints_[GetPeerPort(port)]);
217
218 // The destination port need not be open, unlike the source port.
219 if (!endpoints_[port])
220 return MOJO_RESULT_FAILED_PRECONDITION;
221
222 if (transports) {
223 MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
224 if (result != MOJO_RESULT_OK)
225 return result;
226 }
227
228 // The endpoint's |EnqueueMessage()| may not report failure.
229 endpoints_[port]->EnqueueMessage(message.Pass());
230 return MOJO_RESULT_OK;
231 }
232
233 MojoResult MessagePipe::AttachTransportsNoLock(
234 unsigned port,
235 MessageInTransit* message,
236 std::vector<DispatcherTransport>* transports) {
237 DCHECK(!message->has_dispatchers());
238
239 // You're not allowed to send either handle to a message pipe over the message
240 // pipe, so check for this. (The case of trying to write a handle to itself is
241 // taken care of by |Core|. That case kind of makes sense, but leads to
242 // complications if, e.g., both sides try to do the same thing with their
243 // respective handles simultaneously. The other case, of trying to write the
244 // peer handle to a handle, doesn't make sense -- since no handle will be
245 // available to read the message from.)
246 for (size_t i = 0; i < transports->size(); i++) {
247 if (!(*transports)[i].is_valid())
248 continue;
249 if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
250 MessagePipeDispatcherTransport mp_transport((*transports)[i]);
251 if (mp_transport.GetMessagePipe() == this) {
252 // The other case should have been disallowed by |Core|. (Note: |port|
253 // is the peer port of the handle given to |WriteMessage()|.)
254 DCHECK_EQ(mp_transport.GetPort(), port);
255 return MOJO_RESULT_INVALID_ARGUMENT;
256 }
257 }
258 }
259
260 // Clone the dispatchers and attach them to the message. (This must be done as
261 // a separate loop, since we want to leave the dispatchers alone on failure.)
262 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
263 dispatchers->reserve(transports->size());
264 for (size_t i = 0; i < transports->size(); i++) {
265 if ((*transports)[i].is_valid()) {
266 dispatchers->push_back(
267 (*transports)[i].CreateEquivalentDispatcherAndClose());
268 } else {
269 LOG(WARNING) << "Enqueueing null dispatcher";
270 dispatchers->push_back(scoped_refptr<Dispatcher>());
271 }
272 }
273 message->SetDispatchers(dispatchers.Pass());
274 return MOJO_RESULT_OK;
275 }
276
277 MojoResult MessagePipe::HandleControlMessage(
278 unsigned /*port*/,
279 scoped_ptr<MessageInTransit> message) {
280 LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
281 << message->subtype();
282 return MOJO_RESULT_UNKNOWN;
283 }
284
285 } // namespace system
286 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/system/message_pipe.h ('k') | mojo/system/message_pipe_dispatcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698