Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(445)

Side by Side Diff: mojo/system/message_pipe.cc

Issue 23621056: Initial in-process implementation of some Mojo primitives. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: build fix Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/message_pipe.h"
6
7 #include "base/logging.h"
8 #include "base/stl_util.h"
9 #include "mojo/system/limits.h"
10 #include "mojo/system/memory.h"
11
12 namespace mojo {
13 namespace system {
14
15 namespace {
16
17 unsigned DestinationPortFromSourcePort(unsigned port) {
18 DCHECK(port == 0 || port == 1);
19 return port ^ 1;
20 }
21
22 } // namespace
23
24 MessagePipe::MessagePipe() {
25 is_open_[0] = is_open_[1] = true;
26 }
27
28 void MessagePipe::CancelAllWaiters(unsigned port) {
29 DCHECK(port == 0 || port == 1);
30
31 base::AutoLock locker(lock_);
32 DCHECK(is_open_[port]);
33
34 waiter_lists_[port].CancelAllWaiters();
35 }
36
37 void MessagePipe::Close(unsigned port) {
38 DCHECK(port == 0 || port == 1);
39
40 unsigned destination_port = DestinationPortFromSourcePort(port);
41
42 base::AutoLock locker(lock_);
43 DCHECK(is_open_[port]);
44
45 // Record the old state of the other (destination) port, so we can tell if it
46 // changes.
47 // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we
48 // don't have to do this.
49 MojoWaitFlags old_dest_satisfied_flags = MOJO_WAIT_FLAG_NONE;
50 MojoWaitFlags old_dest_satisfiable_flags = MOJO_WAIT_FLAG_NONE;
51 bool dest_is_open = is_open_[destination_port];
52 if (dest_is_open) {
53 old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port);
54 old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port);
55 }
56
57 is_open_[port] = false;
58 STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port.
59
60 // Notify the other (destination) port if its state has changed.
61 if (dest_is_open) {
62 MojoWaitFlags new_dest_satisfied_flags =
63 SatisfiedFlagsNoLock(destination_port);
64 MojoWaitFlags new_dest_satisfiable_flags =
65 SatisfiableFlagsNoLock(destination_port);
66 if (new_dest_satisfied_flags != old_dest_satisfied_flags ||
67 new_dest_satisfiable_flags != old_dest_satisfiable_flags) {
68 waiter_lists_[destination_port].AwakeWaitersForStateChange(
69 new_dest_satisfied_flags, new_dest_satisfiable_flags);
70 }
71 }
72 }
73
74 MojoResult MessagePipe::WriteMessage(
75 unsigned port,
76 const void* bytes, uint32_t num_bytes,
77 const MojoHandle* handles, uint32_t num_handles,
78 MojoWriteMessageFlags /*flags*/) {
79 DCHECK(port == 0 || port == 1);
80
81 unsigned destination_port = DestinationPortFromSourcePort(port);
82
83 if (!VerifyUserPointer(bytes, num_bytes, 1))
84 return MOJO_RESULT_INVALID_ARGUMENT;
85 if (num_bytes > kMaxMessageNumBytes)
86 return MOJO_RESULT_RESOURCE_EXHAUSTED;
87
88 if (!VerifyUserPointer(handles, num_handles, sizeof(handles[0])))
89 return MOJO_RESULT_INVALID_ARGUMENT;
90 if (num_handles > kMaxMessageNumHandles)
91 return MOJO_RESULT_RESOURCE_EXHAUSTED;
92 if (num_handles > 0) {
93 // TODO(vtl): Verify each handle.
94 NOTIMPLEMENTED();
95 return MOJO_RESULT_UNIMPLEMENTED;
96 }
97
98 // TODO(vtl): Handle flags.
99
100 base::AutoLock locker(lock_);
101 DCHECK(is_open_[port]);
102
103 // The destination port need not be open, unlike the source port.
104 if (!is_open_[destination_port])
105 return MOJO_RESULT_FAILED_PRECONDITION;
106
107 bool dest_was_empty = message_queues_[destination_port].empty();
108
109 // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|.
110 message_queues_[destination_port].push_back(
111 new MessageInTransit(bytes, num_bytes));
112 // TODO(vtl): Support sending handles.
113
114 // The other (destination) port was empty and now isn't, so it should now be
115 // readable. Wake up anyone waiting on this.
116 if (dest_was_empty) {
117 waiter_lists_[destination_port].AwakeWaitersForStateChange(
118 SatisfiedFlagsNoLock(destination_port),
119 SatisfiableFlagsNoLock(destination_port));
120 }
121
122 return MOJO_RESULT_OK;
123 }
124
125 MojoResult MessagePipe::ReadMessage(unsigned port,
126 void* bytes, uint32_t* num_bytes,
127 MojoHandle* handles, uint32_t* num_handles,
128 MojoReadMessageFlags flags) {
129 DCHECK(port == 0 || port == 1);
130
131 const size_t max_bytes = num_bytes ? *num_bytes : 0;
132 if (!VerifyUserPointer(bytes, max_bytes, 1))
133 return MOJO_RESULT_INVALID_ARGUMENT;
134
135 const size_t max_handles = num_handles ? *num_handles : 0;
136 if (!VerifyUserPointer(handles, max_handles, sizeof(handles[0])))
137 return MOJO_RESULT_INVALID_ARGUMENT;
138
139 base::AutoLock locker(lock_);
140 DCHECK(is_open_[port]);
141
142 if (message_queues_[port].empty())
143 return MOJO_RESULT_NOT_FOUND;
144
145 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
146 // and release the lock immediately.
147 bool not_enough_space = false;
148 MessageInTransit* const message = message_queues_[port].front();
149 const size_t message_size = message->data.size();
150 if (num_bytes)
151 *num_bytes = static_cast<uint32_t>(message_size);
152 if (message_size <= max_bytes)
153 memcpy(bytes, message->data.data(), message_size);
154 else
155 not_enough_space = true;
156
157 // TODO(vtl): Support receiving handles.
158 if (num_handles)
159 *num_handles = 0;
160
161 if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
162 message_queues_[port].pop_front();
163 delete message;
164
165 // Now it's empty, thus no longer readable.
166 if (message_queues_[port].empty()) {
167 // It's currently not possible to wait for non-readability, but we should
168 // do the state change anyway.
169 waiter_lists_[port].AwakeWaitersForStateChange(
170 SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port));
171 }
172 }
173
174 if (not_enough_space)
175 return MOJO_RESULT_RESOURCE_EXHAUSTED;
176
177 return MOJO_RESULT_OK;
178 }
179
180 MojoResult MessagePipe::AddWaiter(unsigned port,
181 Waiter* waiter,
182 MojoWaitFlags flags,
183 MojoResult wake_result) {
184 DCHECK(port == 0 || port == 1);
185
186 base::AutoLock locker(lock_);
187 DCHECK(is_open_[port]);
188
189 if ((flags & SatisfiedFlagsNoLock(port)))
190 return MOJO_RESULT_ALREADY_EXISTS;
191 if (!(flags & SatisfiableFlagsNoLock(port)))
192 return MOJO_RESULT_FAILED_PRECONDITION;
193
194 waiter_lists_[port].AddWaiter(waiter, flags, wake_result);
195 return MOJO_RESULT_OK;
196 }
197
198 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
199 DCHECK(port == 0 || port == 1);
200
201 base::AutoLock locker(lock_);
202 DCHECK(is_open_[port]);
203
204 waiter_lists_[port].RemoveWaiter(waiter);
205 }
206
207 MessagePipe::~MessagePipe() {
208 // Owned by the dispatchers. The owning dispatchers should only release us via
209 // their |Close()| method, which should inform us of being closed via our
210 // |Close()|. Thus these should already be null.
211 DCHECK(!is_open_[0]);
212 DCHECK(!is_open_[1]);
213 }
214
215 MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) {
216 DCHECK(port == 0 || port == 1);
217
218 unsigned destination_port = DestinationPortFromSourcePort(port);
219
220 lock_.AssertAcquired();
221
222 MojoWaitFlags satisfied_flags = 0;
223 if (!message_queues_[port].empty())
224 satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
225 if (is_open_[destination_port])
226 satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
227
228 return satisfied_flags;
229 }
230
231 MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) {
232 DCHECK(port == 0 || port == 1);
233
234 unsigned destination_port = DestinationPortFromSourcePort(port);
235
236 lock_.AssertAcquired();
237
238 MojoWaitFlags satisfiable_flags = 0;
239 if (!message_queues_[port].empty() || is_open_[destination_port])
240 satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
241 if (is_open_[destination_port])
242 satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
243
244 return satisfiable_flags;
245 }
246
247 } // namespace system
248 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698