Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(318)

Side by Side Diff: mojo/system/message_pipe.cc

Issue 27060003: Mojo: Abstract out the endpoints of MessagePipes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: rebased Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/message_pipe.h" 5 #include "mojo/system/message_pipe.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "base/stl_util.h" 8 #include "base/stl_util.h"
9 #include "mojo/system/local_message_pipe_endpoint.h"
9 #include "mojo/system/message_in_transit.h" 10 #include "mojo/system/message_in_transit.h"
11 #include "mojo/system/message_pipe_endpoint.h"
10 12
11 namespace mojo { 13 namespace mojo {
12 namespace system { 14 namespace system {
13 15
14 namespace { 16 namespace {
15 17
16 unsigned DestinationPortFromSourcePort(unsigned port) { 18 unsigned DestinationPortFromSourcePort(unsigned port) {
17 DCHECK(port == 0 || port == 1); 19 DCHECK(port == 0 || port == 1);
18 return port ^ 1; 20 return port ^ 1;
19 } 21 }
20 22
21 } // namespace 23 } // namespace
22 24
23 MessagePipe::MessagePipe() { 25 MessagePipe::MessagePipe() {
24 is_open_[0] = is_open_[1] = true; 26 endpoints_[0].reset(new LocalMessagePipeEndpoint());
darin (slow to review) 2013/10/15 06:50:36 I see. This is the connected pair case.
27 endpoints_[1].reset(new LocalMessagePipeEndpoint());
25 } 28 }
26 29
27 void MessagePipe::CancelAllWaiters(unsigned port) { 30 void MessagePipe::CancelAllWaiters(unsigned port) {
28 DCHECK(port == 0 || port == 1); 31 DCHECK(port == 0 || port == 1);
29 32
30 base::AutoLock locker(lock_); 33 base::AutoLock locker(lock_);
31 DCHECK(is_open_[port]); 34 DCHECK(endpoints_[port].get());
32 35 endpoints_[port]->CancelAllWaiters();
33 waiter_lists_[port].CancelAllWaiters();
34 } 36 }
35 37
36 void MessagePipe::Close(unsigned port) { 38 void MessagePipe::Close(unsigned port) {
37 DCHECK(port == 0 || port == 1); 39 DCHECK(port == 0 || port == 1);
38 40
39 unsigned destination_port = DestinationPortFromSourcePort(port); 41 unsigned destination_port = DestinationPortFromSourcePort(port);
40 42
41 base::AutoLock locker(lock_); 43 base::AutoLock locker(lock_);
42 DCHECK(is_open_[port]); 44 DCHECK(endpoints_[port].get());
43 45
44 // Record the old state of the other (destination) port, so we can tell if it 46 endpoints_[port]->Close();
45 // changes. 47 if (endpoints_[destination_port].get())
46 // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we 48 endpoints_[destination_port]->OnPeerClose();
47 // don't have to do this.
48 MojoWaitFlags old_dest_satisfied_flags = MOJO_WAIT_FLAG_NONE;
49 MojoWaitFlags old_dest_satisfiable_flags = MOJO_WAIT_FLAG_NONE;
50 bool dest_is_open = is_open_[destination_port];
51 if (dest_is_open) {
52 old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port);
53 old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port);
54 }
55 49
56 is_open_[port] = false; 50 endpoints_[port].reset();
57 STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port.
58
59 // Notify the other (destination) port if its state has changed.
60 if (dest_is_open) {
61 MojoWaitFlags new_dest_satisfied_flags =
62 SatisfiedFlagsNoLock(destination_port);
63 MojoWaitFlags new_dest_satisfiable_flags =
64 SatisfiableFlagsNoLock(destination_port);
65 if (new_dest_satisfied_flags != old_dest_satisfied_flags ||
66 new_dest_satisfiable_flags != old_dest_satisfiable_flags) {
67 waiter_lists_[destination_port].AwakeWaitersForStateChange(
68 new_dest_satisfied_flags, new_dest_satisfiable_flags);
69 }
70 }
71 } 51 }
72 52
73 // TODO(vtl): Handle flags. 53 // TODO(vtl): Handle flags.
74 MojoResult MessagePipe::WriteMessage( 54 MojoResult MessagePipe::WriteMessage(
75 unsigned port, 55 unsigned port,
76 const void* bytes, uint32_t num_bytes, 56 const void* bytes, uint32_t num_bytes,
77 const MojoHandle* handles, uint32_t num_handles, 57 const MojoHandle* handles, uint32_t num_handles,
78 MojoWriteMessageFlags /*flags*/) { 58 MojoWriteMessageFlags flags) {
79 DCHECK(port == 0 || port == 1); 59 DCHECK(port == 0 || port == 1);
80 60
81 unsigned destination_port = DestinationPortFromSourcePort(port); 61 unsigned destination_port = DestinationPortFromSourcePort(port);
82 62
83 base::AutoLock locker(lock_); 63 base::AutoLock locker(lock_);
84 DCHECK(is_open_[port]); 64 DCHECK(endpoints_[port].get());
85 65
86 // The destination port need not be open, unlike the source port. 66 // The destination port need not be open, unlike the source port.
87 if (!is_open_[destination_port]) 67 if (!endpoints_[destination_port].get())
88 return MOJO_RESULT_FAILED_PRECONDITION; 68 return MOJO_RESULT_FAILED_PRECONDITION;
89 69
90 bool dest_was_empty = message_queues_[destination_port].empty(); 70 return endpoints_[destination_port]->EnqueueMessage(bytes, num_bytes,
91 71 handles, num_handles,
92 // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|. 72 flags);
93 message_queues_[destination_port].push_back(
94 MessageInTransit::Create(bytes, num_bytes));
95 // TODO(vtl): Support sending handles.
96
97 // The other (destination) port was empty and now isn't, so it should now be
98 // readable. Wake up anyone waiting on this.
99 if (dest_was_empty) {
100 waiter_lists_[destination_port].AwakeWaitersForStateChange(
101 SatisfiedFlagsNoLock(destination_port),
102 SatisfiableFlagsNoLock(destination_port));
103 }
104
105 return MOJO_RESULT_OK;
106 } 73 }
107 74
108 MojoResult MessagePipe::ReadMessage(unsigned port, 75 MojoResult MessagePipe::ReadMessage(unsigned port,
109 void* bytes, uint32_t* num_bytes, 76 void* bytes, uint32_t* num_bytes,
110 MojoHandle* handles, uint32_t* num_handles, 77 MojoHandle* handles, uint32_t* num_handles,
111 MojoReadMessageFlags flags) { 78 MojoReadMessageFlags flags) {
112 DCHECK(port == 0 || port == 1); 79 DCHECK(port == 0 || port == 1);
113 80
114 const uint32_t max_bytes = num_bytes ? *num_bytes : 0; 81 base::AutoLock locker(lock_);
115 // TODO(vtl): We'll need this later: 82 DCHECK(endpoints_[port].get());
116 // const uint32_t max_handles = num_handles ? *num_handles : 0;
117 83
118 base::AutoLock locker(lock_); 84 return endpoints_[port]->ReadMessage(bytes, num_bytes,
119 DCHECK(is_open_[port]); 85 handles, num_handles,
120 86 flags);
121 if (message_queues_[port].empty())
122 return MOJO_RESULT_NOT_FOUND;
123
124 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
125 // and release the lock immediately.
126 bool not_enough_space = false;
127 MessageInTransit* const message = message_queues_[port].front();
128 if (num_bytes)
129 *num_bytes = message->data_size();
130 if (message->data_size() <= max_bytes)
131 memcpy(bytes, message->data(), message->data_size());
132 else
133 not_enough_space = true;
134
135 // TODO(vtl): Support receiving handles.
136 if (num_handles)
137 *num_handles = 0;
138
139 if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
140 message_queues_[port].pop_front();
141 message->Destroy();
142
143 // Now it's empty, thus no longer readable.
144 if (message_queues_[port].empty()) {
145 // It's currently not possible to wait for non-readability, but we should
146 // do the state change anyway.
147 waiter_lists_[port].AwakeWaitersForStateChange(
148 SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port));
149 }
150 }
151
152 if (not_enough_space)
153 return MOJO_RESULT_RESOURCE_EXHAUSTED;
154
155 return MOJO_RESULT_OK;
156 } 87 }
157 88
158 MojoResult MessagePipe::AddWaiter(unsigned port, 89 MojoResult MessagePipe::AddWaiter(unsigned port,
159 Waiter* waiter, 90 Waiter* waiter,
160 MojoWaitFlags flags, 91 MojoWaitFlags flags,
161 MojoResult wake_result) { 92 MojoResult wake_result) {
162 DCHECK(port == 0 || port == 1); 93 DCHECK(port == 0 || port == 1);
163 94
164 base::AutoLock locker(lock_); 95 base::AutoLock locker(lock_);
165 DCHECK(is_open_[port]); 96 DCHECK(endpoints_[port].get());
166 97
167 if ((flags & SatisfiedFlagsNoLock(port))) 98 return endpoints_[port]->AddWaiter(waiter, flags, wake_result);
168 return MOJO_RESULT_ALREADY_EXISTS;
169 if (!(flags & SatisfiableFlagsNoLock(port)))
170 return MOJO_RESULT_FAILED_PRECONDITION;
171
172 waiter_lists_[port].AddWaiter(waiter, flags, wake_result);
173 return MOJO_RESULT_OK;
174 } 99 }
175 100
176 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { 101 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
177 DCHECK(port == 0 || port == 1); 102 DCHECK(port == 0 || port == 1);
178 103
179 base::AutoLock locker(lock_); 104 base::AutoLock locker(lock_);
180 DCHECK(is_open_[port]); 105 DCHECK(endpoints_[port].get());
181 106
182 waiter_lists_[port].RemoveWaiter(waiter); 107 endpoints_[port]->RemoveWaiter(waiter);
183 } 108 }
184 109
185 MessagePipe::~MessagePipe() { 110 MessagePipe::~MessagePipe() {
186 // Owned by the dispatchers. The owning dispatchers should only release us via 111 // Owned by the dispatchers. The owning dispatchers should only release us via
187 // their |Close()| method, which should inform us of being closed via our 112 // their |Close()| method, which should inform us of being closed via our
188 // |Close()|. Thus these should already be null. 113 // |Close()|. Thus these should already be null.
189 DCHECK(!is_open_[0]); 114 DCHECK(!endpoints_[0].get());
190 DCHECK(!is_open_[1]); 115 DCHECK(!endpoints_[1].get());
191 }
192
193 MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) {
194 DCHECK(port == 0 || port == 1);
195
196 unsigned destination_port = DestinationPortFromSourcePort(port);
197
198 lock_.AssertAcquired();
199
200 MojoWaitFlags satisfied_flags = 0;
201 if (!message_queues_[port].empty())
202 satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
203 if (is_open_[destination_port])
204 satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
205
206 return satisfied_flags;
207 }
208
209 MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) {
210 DCHECK(port == 0 || port == 1);
211
212 unsigned destination_port = DestinationPortFromSourcePort(port);
213
214 lock_.AssertAcquired();
215
216 MojoWaitFlags satisfiable_flags = 0;
217 if (!message_queues_[port].empty() || is_open_[destination_port])
218 satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
219 if (is_open_[destination_port])
220 satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
221
222 return satisfiable_flags;
223 } 116 }
224 117
225 } // namespace system 118 } // namespace system
226 } // namespace mojo 119 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698