Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(32)

Side by Side Diff: mojo/system/message_pipe.cc

Issue 23621056: Initial in-process implementation of some Mojo primitives. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: wip18.1 Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/message_pipe.h"
6
7 #include "base/logging.h"
8 #include "base/stl_util.h"
9 #include "mojo/system/limits.h"
10 #include "mojo/system/memory.h"
11
12 namespace mojo {
13 namespace system {
14
15 namespace {
16
17 unsigned DestinationPortFromSourcePort(unsigned port) {
18 DCHECK(port == 0 || port == 1);
19 return port ^ 1;
20 }
21
22 } // namespace
23
24 MessagePipe::MessagePipe() {
25 is_open_[0] = is_open_[1] = true;
26 }
27
28 void MessagePipe::CancelAllWaiters(unsigned port) {
29 DCHECK(port == 0 || port == 1);
30
31 base::AutoLock locker(lock_);
32 DCHECK(is_open_[port]);
33
34 waiter_lists_[port].CancelAllWaiters();
35 }
36
37 void MessagePipe::Close(unsigned port) {
38 DCHECK(port == 0 || port == 1);
39
40 unsigned destination_port = DestinationPortFromSourcePort(port);
41
42 base::AutoLock locker(lock_);
43 DCHECK(is_open_[port]);
44
45 // Record the old state of the other (destination) port, so we can tell if it
46 // changes.
47 // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we
48 // don't have to do this.
49 MojoWaitFlags old_dest_satisfied_flags;
50 MojoWaitFlags old_dest_satisfiable_flags;
51 if (is_open_[destination_port]) {
52 old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port);
53 old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port);
54 }
55
56 is_open_[port] = false;
57 STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port.
58
59 // Notify the other (destination) port if its state has changed.
60 if (is_open_[destination_port]) {
61 MojoWaitFlags new_dest_satisfied_flags =
62 SatisfiedFlagsNoLock(destination_port);
63 MojoWaitFlags new_dest_satisfiable_flags =
64 SatisfiableFlagsNoLock(destination_port);
65 if (new_dest_satisfied_flags != old_dest_satisfied_flags ||
66 new_dest_satisfiable_flags != old_dest_satisfiable_flags) {
67 waiter_lists_[destination_port].AwakeWaitersForStateChange(
68 new_dest_satisfied_flags, new_dest_satisfiable_flags);
69 }
70 }
71 }
72
73 MojoResult MessagePipe::WriteMessage(
74 unsigned port,
75 const void* bytes, uint32_t num_bytes,
76 const MojoHandle* handles, uint32_t num_handles,
77 MojoWriteMessageFlags /*flags*/) {
78 DCHECK(port == 0 || port == 1);
79
80 unsigned destination_port = DestinationPortFromSourcePort(port);
81
82 if (!VerifyUserPointer(bytes, num_bytes, 1))
83 return MOJO_RESULT_INVALID_ARGUMENT;
84 if (num_bytes > kMaxMessageNumBytes)
85 return MOJO_RESULT_OUT_OF_RANGE;
86
87 if (!VerifyUserPointer(handles, num_handles, sizeof(handles[0])))
88 return MOJO_RESULT_INVALID_ARGUMENT;
89 if (num_handles > kMaxMessageNumHandles)
90 return MOJO_RESULT_OUT_OF_RANGE;
91 if (num_handles > 0) {
92 // TODO(vtl): Verify each handle.
93 NOTIMPLEMENTED();
94 return MOJO_RESULT_UNIMPLEMENTED;
95 }
96
97 // TODO(vtl): Handle flags.
98
99 base::AutoLock locker(lock_);
100 DCHECK(is_open_[port]);
101
102 // The destination port need not be open, unlike the source port.
103 if (!is_open_[destination_port])
104 return MOJO_RESULT_FAILED_PRECONDITION;
105
106 bool dest_was_empty = message_queues_[destination_port].empty();
107
108 // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|.
109 message_queues_[destination_port].push_back(
110 new MessageInTransit(bytes, num_bytes));
111 // TODO(vtl): Support sending handles.
112
113 // The other (destination) port was empty and now isn't, so it should now be
114 // readable. Wake up anyone waiting on this.
115 if (dest_was_empty) {
116 waiter_lists_[destination_port].AwakeWaitersForStateChange(
117 SatisfiedFlagsNoLock(destination_port),
118 SatisfiableFlagsNoLock(destination_port));
119 }
120
121 return MOJO_RESULT_OK;
122 }
123
124 MojoResult MessagePipe::ReadMessage(unsigned port,
125 void* bytes, uint32_t* num_bytes,
126 MojoHandle* handles, uint32_t* num_handles,
127 MojoReadMessageFlags flags) {
128 DCHECK(port == 0 || port == 1);
129
130 const size_t max_bytes = num_bytes ? *num_bytes : 0;
131 if (!VerifyUserPointer(bytes, max_bytes, 1))
132 return MOJO_RESULT_INVALID_ARGUMENT;
133
134 const size_t max_handles = num_handles ? *num_handles : 0;
135 if (!VerifyUserPointer(handles, max_handles, sizeof(handles[0])))
136 return MOJO_RESULT_INVALID_ARGUMENT;
137
138 base::AutoLock locker(lock_);
139 DCHECK(is_open_[port]);
140
141 if (message_queues_[port].empty())
142 return MOJO_RESULT_NOT_FOUND;
143
144 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
145 // and release the lock immediately.
146 bool not_enough_space = false;
147 MessageInTransit* const message = message_queues_[port].front();
148 const size_t message_size = message->data.size();
149 if (num_bytes)
150 *num_bytes = static_cast<uint32_t>(message_size);
151 if (message_size <= max_bytes)
152 memcpy(bytes, message->data.data(), message_size);
153 else
154 not_enough_space = true;
155
156 // TODO(vtl): Support receiving handles.
157 if (num_handles)
158 *num_handles = 0;
159
160 if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
161 message_queues_[port].pop_front();
162 delete message;
163
164 // Now it's empty, thus no longer readable.
165 if (message_queues_[port].empty()) {
166 // It's currently not possible to wait for non-readability, but we should
167 // do the state change anyway.
168 waiter_lists_[port].AwakeWaitersForStateChange(
169 SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port));
170 }
171 }
172
173 if (not_enough_space)
174 return MOJO_RESULT_OUT_OF_RANGE;
175
176 return MOJO_RESULT_OK;
177 }
178
179 MojoResult MessagePipe::AddWaiter(unsigned port,
180 Waiter* waiter,
181 MojoWaitFlags flags,
182 MojoResult wake_result) {
183 DCHECK(port == 0 || port == 1);
184
185 base::AutoLock locker(lock_);
186 DCHECK(is_open_[port]);
187
188 if ((flags & SatisfiedFlagsNoLock(port)))
189 return MOJO_RESULT_ALREADY_EXISTS;
190 if (!(flags & SatisfiableFlagsNoLock(port)))
191 return MOJO_RESULT_FAILED_PRECONDITION;
192
193 waiter_lists_[port].AddWaiter(waiter, flags, wake_result);
194 return MOJO_RESULT_OK;
195 }
196
197 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
198 DCHECK(port == 0 || port == 1);
199
200 base::AutoLock locker(lock_);
201 DCHECK(is_open_[port]);
202
203 waiter_lists_[port].RemoveWaiter(waiter);
204 }
205
206 MessagePipe::~MessagePipe() {
207 // Owned by the dispatchers. The owning dispatchers should only release us via
208 // their |Close()| method, which should inform us of being closed via our
209 // |Close()|. Thus these should already be null.
210 DCHECK(!is_open_[0]);
211 DCHECK(!is_open_[1]);
212 }
213
214 MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) {
215 DCHECK(port == 0 || port == 1);
216
217 unsigned destination_port = DestinationPortFromSourcePort(port);
218
219 lock_.AssertAcquired();
220
221 MojoWaitFlags satisfied_flags = 0;
222 if (!message_queues_[port].empty())
223 satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
224 if (is_open_[destination_port])
225 satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
226
227 return satisfied_flags;
228 }
229
230 MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) {
231 DCHECK(port == 0 || port == 1);
232
233 unsigned destination_port = DestinationPortFromSourcePort(port);
234
235 lock_.AssertAcquired();
236
237 MojoWaitFlags satisfiable_flags = 0;
238 if (!message_queues_[port].empty() || is_open_[destination_port])
239 satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
240 if (is_open_[destination_port])
241 satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
242
243 return satisfiable_flags;
244 }
245
246 } // namespace system
247 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698