OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/system/message_pipe.h" | 5 #include "mojo/system/message_pipe.h" |
6 | 6 |
7 #include "base/logging.h" | 7 #include "base/logging.h" |
8 #include "base/stl_util.h" | 8 #include "base/stl_util.h" |
| 9 #include "mojo/system/local_message_pipe_endpoint.h" |
9 #include "mojo/system/message_in_transit.h" | 10 #include "mojo/system/message_in_transit.h" |
| 11 #include "mojo/system/message_pipe_endpoint.h" |
10 | 12 |
11 namespace mojo { | 13 namespace mojo { |
12 namespace system { | 14 namespace system { |
13 | 15 |
14 namespace { | 16 namespace { |
15 | 17 |
16 unsigned DestinationPortFromSourcePort(unsigned port) { | 18 unsigned DestinationPortFromSourcePort(unsigned port) { |
17 DCHECK(port == 0 || port == 1); | 19 DCHECK(port == 0 || port == 1); |
18 return port ^ 1; | 20 return port ^ 1; |
19 } | 21 } |
20 | 22 |
21 } // namespace | 23 } // namespace |
22 | 24 |
| 25 MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0, |
| 26 scoped_ptr<MessagePipeEndpoint> endpoint_1) { |
| 27 endpoints_[0].reset(endpoint_0.release()); |
| 28 endpoints_[1].reset(endpoint_1.release()); |
| 29 } |
| 30 |
23 MessagePipe::MessagePipe() { | 31 MessagePipe::MessagePipe() { |
24 is_open_[0] = is_open_[1] = true; | 32 endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
| 33 endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
25 } | 34 } |
26 | 35 |
27 void MessagePipe::CancelAllWaiters(unsigned port) { | 36 void MessagePipe::CancelAllWaiters(unsigned port) { |
28 DCHECK(port == 0 || port == 1); | 37 DCHECK(port == 0 || port == 1); |
29 | 38 |
30 base::AutoLock locker(lock_); | 39 base::AutoLock locker(lock_); |
31 DCHECK(is_open_[port]); | 40 DCHECK(endpoints_[port].get()); |
32 | 41 endpoints_[port]->CancelAllWaiters(); |
33 waiter_lists_[port].CancelAllWaiters(); | |
34 } | 42 } |
35 | 43 |
36 void MessagePipe::Close(unsigned port) { | 44 void MessagePipe::Close(unsigned port) { |
37 DCHECK(port == 0 || port == 1); | 45 DCHECK(port == 0 || port == 1); |
38 | 46 |
39 unsigned destination_port = DestinationPortFromSourcePort(port); | 47 unsigned destination_port = DestinationPortFromSourcePort(port); |
40 | 48 |
41 base::AutoLock locker(lock_); | 49 base::AutoLock locker(lock_); |
42 DCHECK(is_open_[port]); | 50 DCHECK(endpoints_[port].get()); |
43 | 51 |
44 // Record the old state of the other (destination) port, so we can tell if it | 52 endpoints_[port]->Close(); |
45 // changes. | 53 if (endpoints_[destination_port].get()) |
46 // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we | 54 endpoints_[destination_port]->OnPeerClose(); |
47 // don't have to do this. | |
48 MojoWaitFlags old_dest_satisfied_flags = MOJO_WAIT_FLAG_NONE; | |
49 MojoWaitFlags old_dest_satisfiable_flags = MOJO_WAIT_FLAG_NONE; | |
50 bool dest_is_open = is_open_[destination_port]; | |
51 if (dest_is_open) { | |
52 old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port); | |
53 old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port); | |
54 } | |
55 | 55 |
56 is_open_[port] = false; | 56 endpoints_[port].reset(); |
57 STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port. | |
58 | |
59 // Notify the other (destination) port if its state has changed. | |
60 if (dest_is_open) { | |
61 MojoWaitFlags new_dest_satisfied_flags = | |
62 SatisfiedFlagsNoLock(destination_port); | |
63 MojoWaitFlags new_dest_satisfiable_flags = | |
64 SatisfiableFlagsNoLock(destination_port); | |
65 if (new_dest_satisfied_flags != old_dest_satisfied_flags || | |
66 new_dest_satisfiable_flags != old_dest_satisfiable_flags) { | |
67 waiter_lists_[destination_port].AwakeWaitersForStateChange( | |
68 new_dest_satisfied_flags, new_dest_satisfiable_flags); | |
69 } | |
70 } | |
71 } | 57 } |
72 | 58 |
73 // TODO(vtl): Handle flags. | 59 // TODO(vtl): Handle flags. |
74 MojoResult MessagePipe::WriteMessage( | 60 MojoResult MessagePipe::WriteMessage( |
75 unsigned port, | 61 unsigned port, |
76 const void* bytes, uint32_t num_bytes, | 62 const void* bytes, uint32_t num_bytes, |
77 const MojoHandle* handles, uint32_t num_handles, | 63 const MojoHandle* handles, uint32_t num_handles, |
78 MojoWriteMessageFlags /*flags*/) { | 64 MojoWriteMessageFlags flags) { |
79 DCHECK(port == 0 || port == 1); | 65 DCHECK(port == 0 || port == 1); |
80 | 66 |
81 unsigned destination_port = DestinationPortFromSourcePort(port); | 67 unsigned destination_port = DestinationPortFromSourcePort(port); |
82 | 68 |
83 base::AutoLock locker(lock_); | 69 base::AutoLock locker(lock_); |
84 DCHECK(is_open_[port]); | 70 DCHECK(endpoints_[port].get()); |
85 | 71 |
86 // The destination port need not be open, unlike the source port. | 72 // The destination port need not be open, unlike the source port. |
87 if (!is_open_[destination_port]) | 73 if (!endpoints_[destination_port].get()) |
88 return MOJO_RESULT_FAILED_PRECONDITION; | 74 return MOJO_RESULT_FAILED_PRECONDITION; |
89 | 75 |
90 bool dest_was_empty = message_queues_[destination_port].empty(); | 76 return endpoints_[destination_port]->EnqueueMessage(bytes, num_bytes, |
91 | 77 handles, num_handles, |
92 // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|. | 78 flags); |
93 message_queues_[destination_port].push_back( | |
94 MessageInTransit::Create(bytes, num_bytes)); | |
95 // TODO(vtl): Support sending handles. | |
96 | |
97 // The other (destination) port was empty and now isn't, so it should now be | |
98 // readable. Wake up anyone waiting on this. | |
99 if (dest_was_empty) { | |
100 waiter_lists_[destination_port].AwakeWaitersForStateChange( | |
101 SatisfiedFlagsNoLock(destination_port), | |
102 SatisfiableFlagsNoLock(destination_port)); | |
103 } | |
104 | |
105 return MOJO_RESULT_OK; | |
106 } | 79 } |
107 | 80 |
108 MojoResult MessagePipe::ReadMessage(unsigned port, | 81 MojoResult MessagePipe::ReadMessage(unsigned port, |
109 void* bytes, uint32_t* num_bytes, | 82 void* bytes, uint32_t* num_bytes, |
110 MojoHandle* handles, uint32_t* num_handles, | 83 MojoHandle* handles, uint32_t* num_handles, |
111 MojoReadMessageFlags flags) { | 84 MojoReadMessageFlags flags) { |
112 DCHECK(port == 0 || port == 1); | 85 DCHECK(port == 0 || port == 1); |
113 | 86 |
114 const uint32_t max_bytes = num_bytes ? *num_bytes : 0; | 87 base::AutoLock locker(lock_); |
115 // TODO(vtl): We'll need this later: | 88 DCHECK(endpoints_[port].get()); |
116 // const uint32_t max_handles = num_handles ? *num_handles : 0; | |
117 | 89 |
118 base::AutoLock locker(lock_); | 90 return endpoints_[port]->ReadMessage(bytes, num_bytes, |
119 DCHECK(is_open_[port]); | 91 handles, num_handles, |
120 | 92 flags); |
121 if (message_queues_[port].empty()) | |
122 return MOJO_RESULT_NOT_FOUND; | |
123 | |
124 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop | |
125 // and release the lock immediately. | |
126 bool not_enough_space = false; | |
127 MessageInTransit* const message = message_queues_[port].front(); | |
128 if (num_bytes) | |
129 *num_bytes = message->data_size(); | |
130 if (message->data_size() <= max_bytes) | |
131 memcpy(bytes, message->data(), message->data_size()); | |
132 else | |
133 not_enough_space = true; | |
134 | |
135 // TODO(vtl): Support receiving handles. | |
136 if (num_handles) | |
137 *num_handles = 0; | |
138 | |
139 if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { | |
140 message_queues_[port].pop_front(); | |
141 message->Destroy(); | |
142 | |
143 // Now it's empty, thus no longer readable. | |
144 if (message_queues_[port].empty()) { | |
145 // It's currently not possible to wait for non-readability, but we should | |
146 // do the state change anyway. | |
147 waiter_lists_[port].AwakeWaitersForStateChange( | |
148 SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port)); | |
149 } | |
150 } | |
151 | |
152 if (not_enough_space) | |
153 return MOJO_RESULT_RESOURCE_EXHAUSTED; | |
154 | |
155 return MOJO_RESULT_OK; | |
156 } | 93 } |
157 | 94 |
158 MojoResult MessagePipe::AddWaiter(unsigned port, | 95 MojoResult MessagePipe::AddWaiter(unsigned port, |
159 Waiter* waiter, | 96 Waiter* waiter, |
160 MojoWaitFlags flags, | 97 MojoWaitFlags flags, |
161 MojoResult wake_result) { | 98 MojoResult wake_result) { |
162 DCHECK(port == 0 || port == 1); | 99 DCHECK(port == 0 || port == 1); |
163 | 100 |
164 base::AutoLock locker(lock_); | 101 base::AutoLock locker(lock_); |
165 DCHECK(is_open_[port]); | 102 DCHECK(endpoints_[port].get()); |
166 | 103 |
167 if ((flags & SatisfiedFlagsNoLock(port))) | 104 return endpoints_[port]->AddWaiter(waiter, flags, wake_result); |
168 return MOJO_RESULT_ALREADY_EXISTS; | |
169 if (!(flags & SatisfiableFlagsNoLock(port))) | |
170 return MOJO_RESULT_FAILED_PRECONDITION; | |
171 | |
172 waiter_lists_[port].AddWaiter(waiter, flags, wake_result); | |
173 return MOJO_RESULT_OK; | |
174 } | 105 } |
175 | 106 |
176 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { | 107 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { |
177 DCHECK(port == 0 || port == 1); | 108 DCHECK(port == 0 || port == 1); |
178 | 109 |
179 base::AutoLock locker(lock_); | 110 base::AutoLock locker(lock_); |
180 DCHECK(is_open_[port]); | 111 DCHECK(endpoints_[port].get()); |
181 | 112 |
182 waiter_lists_[port].RemoveWaiter(waiter); | 113 endpoints_[port]->RemoveWaiter(waiter); |
183 } | 114 } |
184 | 115 |
185 MessagePipe::~MessagePipe() { | 116 MessagePipe::~MessagePipe() { |
186 // Owned by the dispatchers. The owning dispatchers should only release us via | 117 // Owned by the dispatchers. The owning dispatchers should only release us via |
187 // their |Close()| method, which should inform us of being closed via our | 118 // their |Close()| method, which should inform us of being closed via our |
188 // |Close()|. Thus these should already be null. | 119 // |Close()|. Thus these should already be null. |
189 DCHECK(!is_open_[0]); | 120 DCHECK(!endpoints_[0].get()); |
190 DCHECK(!is_open_[1]); | 121 DCHECK(!endpoints_[1].get()); |
191 } | |
192 | |
193 MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) { | |
194 DCHECK(port == 0 || port == 1); | |
195 | |
196 unsigned destination_port = DestinationPortFromSourcePort(port); | |
197 | |
198 lock_.AssertAcquired(); | |
199 | |
200 MojoWaitFlags satisfied_flags = 0; | |
201 if (!message_queues_[port].empty()) | |
202 satisfied_flags |= MOJO_WAIT_FLAG_READABLE; | |
203 if (is_open_[destination_port]) | |
204 satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE; | |
205 | |
206 return satisfied_flags; | |
207 } | |
208 | |
209 MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) { | |
210 DCHECK(port == 0 || port == 1); | |
211 | |
212 unsigned destination_port = DestinationPortFromSourcePort(port); | |
213 | |
214 lock_.AssertAcquired(); | |
215 | |
216 MojoWaitFlags satisfiable_flags = 0; | |
217 if (!message_queues_[port].empty() || is_open_[destination_port]) | |
218 satisfiable_flags |= MOJO_WAIT_FLAG_READABLE; | |
219 if (is_open_[destination_port]) | |
220 satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE; | |
221 | |
222 return satisfiable_flags; | |
223 } | 122 } |
224 | 123 |
225 } // namespace system | 124 } // namespace system |
226 } // namespace mojo | 125 } // namespace mojo |
OLD | NEW |