Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(146)

Side by Side Diff: mojo/system/message_pipe.cc

Issue 27060003: Mojo: Abstract out the endpoints of MessagePipes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: rebased Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « mojo/system/message_pipe.h ('k') | mojo/system/message_pipe_endpoint.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/message_pipe.h" 5 #include "mojo/system/message_pipe.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "base/stl_util.h" 8 #include "base/stl_util.h"
9 #include "mojo/system/local_message_pipe_endpoint.h"
9 #include "mojo/system/message_in_transit.h" 10 #include "mojo/system/message_in_transit.h"
11 #include "mojo/system/message_pipe_endpoint.h"
10 12
11 namespace mojo { 13 namespace mojo {
12 namespace system { 14 namespace system {
13 15
14 namespace { 16 namespace {
15 17
16 unsigned DestinationPortFromSourcePort(unsigned port) { 18 unsigned DestinationPortFromSourcePort(unsigned port) {
17 DCHECK(port == 0 || port == 1); 19 DCHECK(port == 0 || port == 1);
18 return port ^ 1; 20 return port ^ 1;
19 } 21 }
20 22
21 } // namespace 23 } // namespace
22 24
25 MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0,
26 scoped_ptr<MessagePipeEndpoint> endpoint_1) {
27 endpoints_[0].reset(endpoint_0.release());
28 endpoints_[1].reset(endpoint_1.release());
29 }
30
23 MessagePipe::MessagePipe() { 31 MessagePipe::MessagePipe() {
24 is_open_[0] = is_open_[1] = true; 32 endpoints_[0].reset(new LocalMessagePipeEndpoint());
33 endpoints_[1].reset(new LocalMessagePipeEndpoint());
25 } 34 }
26 35
27 void MessagePipe::CancelAllWaiters(unsigned port) { 36 void MessagePipe::CancelAllWaiters(unsigned port) {
28 DCHECK(port == 0 || port == 1); 37 DCHECK(port == 0 || port == 1);
29 38
30 base::AutoLock locker(lock_); 39 base::AutoLock locker(lock_);
31 DCHECK(is_open_[port]); 40 DCHECK(endpoints_[port].get());
32 41 endpoints_[port]->CancelAllWaiters();
33 waiter_lists_[port].CancelAllWaiters();
34 } 42 }
35 43
36 void MessagePipe::Close(unsigned port) { 44 void MessagePipe::Close(unsigned port) {
37 DCHECK(port == 0 || port == 1); 45 DCHECK(port == 0 || port == 1);
38 46
39 unsigned destination_port = DestinationPortFromSourcePort(port); 47 unsigned destination_port = DestinationPortFromSourcePort(port);
40 48
41 base::AutoLock locker(lock_); 49 base::AutoLock locker(lock_);
42 DCHECK(is_open_[port]); 50 DCHECK(endpoints_[port].get());
43 51
44 // Record the old state of the other (destination) port, so we can tell if it 52 endpoints_[port]->Close();
45 // changes. 53 if (endpoints_[destination_port].get())
46 // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we 54 endpoints_[destination_port]->OnPeerClose();
47 // don't have to do this.
48 MojoWaitFlags old_dest_satisfied_flags = MOJO_WAIT_FLAG_NONE;
49 MojoWaitFlags old_dest_satisfiable_flags = MOJO_WAIT_FLAG_NONE;
50 bool dest_is_open = is_open_[destination_port];
51 if (dest_is_open) {
52 old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port);
53 old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port);
54 }
55 55
56 is_open_[port] = false; 56 endpoints_[port].reset();
57 STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port.
58
59 // Notify the other (destination) port if its state has changed.
60 if (dest_is_open) {
61 MojoWaitFlags new_dest_satisfied_flags =
62 SatisfiedFlagsNoLock(destination_port);
63 MojoWaitFlags new_dest_satisfiable_flags =
64 SatisfiableFlagsNoLock(destination_port);
65 if (new_dest_satisfied_flags != old_dest_satisfied_flags ||
66 new_dest_satisfiable_flags != old_dest_satisfiable_flags) {
67 waiter_lists_[destination_port].AwakeWaitersForStateChange(
68 new_dest_satisfied_flags, new_dest_satisfiable_flags);
69 }
70 }
71 } 57 }
72 58
73 // TODO(vtl): Handle flags. 59 // TODO(vtl): Handle flags.
74 MojoResult MessagePipe::WriteMessage( 60 MojoResult MessagePipe::WriteMessage(
75 unsigned port, 61 unsigned port,
76 const void* bytes, uint32_t num_bytes, 62 const void* bytes, uint32_t num_bytes,
77 const MojoHandle* handles, uint32_t num_handles, 63 const MojoHandle* handles, uint32_t num_handles,
78 MojoWriteMessageFlags /*flags*/) { 64 MojoWriteMessageFlags flags) {
79 DCHECK(port == 0 || port == 1); 65 DCHECK(port == 0 || port == 1);
80 66
81 unsigned destination_port = DestinationPortFromSourcePort(port); 67 unsigned destination_port = DestinationPortFromSourcePort(port);
82 68
83 base::AutoLock locker(lock_); 69 base::AutoLock locker(lock_);
84 DCHECK(is_open_[port]); 70 DCHECK(endpoints_[port].get());
85 71
86 // The destination port need not be open, unlike the source port. 72 // The destination port need not be open, unlike the source port.
87 if (!is_open_[destination_port]) 73 if (!endpoints_[destination_port].get())
88 return MOJO_RESULT_FAILED_PRECONDITION; 74 return MOJO_RESULT_FAILED_PRECONDITION;
89 75
90 bool dest_was_empty = message_queues_[destination_port].empty(); 76 return endpoints_[destination_port]->EnqueueMessage(bytes, num_bytes,
91 77 handles, num_handles,
92 // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|. 78 flags);
93 message_queues_[destination_port].push_back(
94 MessageInTransit::Create(bytes, num_bytes));
95 // TODO(vtl): Support sending handles.
96
97 // The other (destination) port was empty and now isn't, so it should now be
98 // readable. Wake up anyone waiting on this.
99 if (dest_was_empty) {
100 waiter_lists_[destination_port].AwakeWaitersForStateChange(
101 SatisfiedFlagsNoLock(destination_port),
102 SatisfiableFlagsNoLock(destination_port));
103 }
104
105 return MOJO_RESULT_OK;
106 } 79 }
107 80
108 MojoResult MessagePipe::ReadMessage(unsigned port, 81 MojoResult MessagePipe::ReadMessage(unsigned port,
109 void* bytes, uint32_t* num_bytes, 82 void* bytes, uint32_t* num_bytes,
110 MojoHandle* handles, uint32_t* num_handles, 83 MojoHandle* handles, uint32_t* num_handles,
111 MojoReadMessageFlags flags) { 84 MojoReadMessageFlags flags) {
112 DCHECK(port == 0 || port == 1); 85 DCHECK(port == 0 || port == 1);
113 86
114 const uint32_t max_bytes = num_bytes ? *num_bytes : 0; 87 base::AutoLock locker(lock_);
115 // TODO(vtl): We'll need this later: 88 DCHECK(endpoints_[port].get());
116 // const uint32_t max_handles = num_handles ? *num_handles : 0;
117 89
118 base::AutoLock locker(lock_); 90 return endpoints_[port]->ReadMessage(bytes, num_bytes,
119 DCHECK(is_open_[port]); 91 handles, num_handles,
120 92 flags);
121 if (message_queues_[port].empty())
122 return MOJO_RESULT_NOT_FOUND;
123
124 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
125 // and release the lock immediately.
126 bool not_enough_space = false;
127 MessageInTransit* const message = message_queues_[port].front();
128 if (num_bytes)
129 *num_bytes = message->data_size();
130 if (message->data_size() <= max_bytes)
131 memcpy(bytes, message->data(), message->data_size());
132 else
133 not_enough_space = true;
134
135 // TODO(vtl): Support receiving handles.
136 if (num_handles)
137 *num_handles = 0;
138
139 if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
140 message_queues_[port].pop_front();
141 message->Destroy();
142
143 // Now it's empty, thus no longer readable.
144 if (message_queues_[port].empty()) {
145 // It's currently not possible to wait for non-readability, but we should
146 // do the state change anyway.
147 waiter_lists_[port].AwakeWaitersForStateChange(
148 SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port));
149 }
150 }
151
152 if (not_enough_space)
153 return MOJO_RESULT_RESOURCE_EXHAUSTED;
154
155 return MOJO_RESULT_OK;
156 } 93 }
157 94
158 MojoResult MessagePipe::AddWaiter(unsigned port, 95 MojoResult MessagePipe::AddWaiter(unsigned port,
159 Waiter* waiter, 96 Waiter* waiter,
160 MojoWaitFlags flags, 97 MojoWaitFlags flags,
161 MojoResult wake_result) { 98 MojoResult wake_result) {
162 DCHECK(port == 0 || port == 1); 99 DCHECK(port == 0 || port == 1);
163 100
164 base::AutoLock locker(lock_); 101 base::AutoLock locker(lock_);
165 DCHECK(is_open_[port]); 102 DCHECK(endpoints_[port].get());
166 103
167 if ((flags & SatisfiedFlagsNoLock(port))) 104 return endpoints_[port]->AddWaiter(waiter, flags, wake_result);
168 return MOJO_RESULT_ALREADY_EXISTS;
169 if (!(flags & SatisfiableFlagsNoLock(port)))
170 return MOJO_RESULT_FAILED_PRECONDITION;
171
172 waiter_lists_[port].AddWaiter(waiter, flags, wake_result);
173 return MOJO_RESULT_OK;
174 } 105 }
175 106
176 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { 107 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
177 DCHECK(port == 0 || port == 1); 108 DCHECK(port == 0 || port == 1);
178 109
179 base::AutoLock locker(lock_); 110 base::AutoLock locker(lock_);
180 DCHECK(is_open_[port]); 111 DCHECK(endpoints_[port].get());
181 112
182 waiter_lists_[port].RemoveWaiter(waiter); 113 endpoints_[port]->RemoveWaiter(waiter);
183 } 114 }
184 115
185 MessagePipe::~MessagePipe() { 116 MessagePipe::~MessagePipe() {
186 // Owned by the dispatchers. The owning dispatchers should only release us via 117 // Owned by the dispatchers. The owning dispatchers should only release us via
187 // their |Close()| method, which should inform us of being closed via our 118 // their |Close()| method, which should inform us of being closed via our
188 // |Close()|. Thus these should already be null. 119 // |Close()|. Thus these should already be null.
189 DCHECK(!is_open_[0]); 120 DCHECK(!endpoints_[0].get());
190 DCHECK(!is_open_[1]); 121 DCHECK(!endpoints_[1].get());
191 }
192
193 MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) {
194 DCHECK(port == 0 || port == 1);
195
196 unsigned destination_port = DestinationPortFromSourcePort(port);
197
198 lock_.AssertAcquired();
199
200 MojoWaitFlags satisfied_flags = 0;
201 if (!message_queues_[port].empty())
202 satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
203 if (is_open_[destination_port])
204 satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
205
206 return satisfied_flags;
207 }
208
209 MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) {
210 DCHECK(port == 0 || port == 1);
211
212 unsigned destination_port = DestinationPortFromSourcePort(port);
213
214 lock_.AssertAcquired();
215
216 MojoWaitFlags satisfiable_flags = 0;
217 if (!message_queues_[port].empty() || is_open_[destination_port])
218 satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
219 if (is_open_[destination_port])
220 satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
221
222 return satisfiable_flags;
223 } 122 }
224 123
225 } // namespace system 124 } // namespace system
226 } // namespace mojo 125 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/system/message_pipe.h ('k') | mojo/system/message_pipe_endpoint.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698