Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/system/message_pipe.h" | 5 #include "mojo/system/message_pipe.h" |
| 6 | 6 |
| 7 #include "base/logging.h" | 7 #include "base/logging.h" |
| 8 #include "base/stl_util.h" | 8 #include "base/stl_util.h" |
| 9 #include "mojo/system/local_message_pipe_endpoint.h" | |
| 9 #include "mojo/system/message_in_transit.h" | 10 #include "mojo/system/message_in_transit.h" |
| 11 #include "mojo/system/message_pipe_endpoint.h" | |
| 10 | 12 |
| 11 namespace mojo { | 13 namespace mojo { |
| 12 namespace system { | 14 namespace system { |
| 13 | 15 |
| 14 namespace { | 16 namespace { |
| 15 | 17 |
| 16 unsigned DestinationPortFromSourcePort(unsigned port) { | 18 unsigned DestinationPortFromSourcePort(unsigned port) { |
| 17 DCHECK(port == 0 || port == 1); | 19 DCHECK(port == 0 || port == 1); |
| 18 return port ^ 1; | 20 return port ^ 1; |
| 19 } | 21 } |
| 20 | 22 |
| 21 } // namespace | 23 } // namespace |
| 22 | 24 |
| 23 MessagePipe::MessagePipe() { | 25 MessagePipe::MessagePipe() { |
| 24 is_open_[0] = is_open_[1] = true; | 26 endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
|
darin (slow to review)
2013/10/15 06:50:36
I see. This is the connected pair case.
| |
| 27 endpoints_[1].reset(new LocalMessagePipeEndpoint()); | |
| 25 } | 28 } |
| 26 | 29 |
| 27 void MessagePipe::CancelAllWaiters(unsigned port) { | 30 void MessagePipe::CancelAllWaiters(unsigned port) { |
| 28 DCHECK(port == 0 || port == 1); | 31 DCHECK(port == 0 || port == 1); |
| 29 | 32 |
| 30 base::AutoLock locker(lock_); | 33 base::AutoLock locker(lock_); |
| 31 DCHECK(is_open_[port]); | 34 DCHECK(endpoints_[port].get()); |
| 32 | 35 endpoints_[port]->CancelAllWaiters(); |
| 33 waiter_lists_[port].CancelAllWaiters(); | |
| 34 } | 36 } |
| 35 | 37 |
| 36 void MessagePipe::Close(unsigned port) { | 38 void MessagePipe::Close(unsigned port) { |
| 37 DCHECK(port == 0 || port == 1); | 39 DCHECK(port == 0 || port == 1); |
| 38 | 40 |
| 39 unsigned destination_port = DestinationPortFromSourcePort(port); | 41 unsigned destination_port = DestinationPortFromSourcePort(port); |
| 40 | 42 |
| 41 base::AutoLock locker(lock_); | 43 base::AutoLock locker(lock_); |
| 42 DCHECK(is_open_[port]); | 44 DCHECK(endpoints_[port].get()); |
| 43 | 45 |
| 44 // Record the old state of the other (destination) port, so we can tell if it | 46 endpoints_[port]->Close(); |
| 45 // changes. | 47 if (endpoints_[destination_port].get()) |
| 46 // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we | 48 endpoints_[destination_port]->OnPeerClose(); |
| 47 // don't have to do this. | |
| 48 MojoWaitFlags old_dest_satisfied_flags = MOJO_WAIT_FLAG_NONE; | |
| 49 MojoWaitFlags old_dest_satisfiable_flags = MOJO_WAIT_FLAG_NONE; | |
| 50 bool dest_is_open = is_open_[destination_port]; | |
| 51 if (dest_is_open) { | |
| 52 old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port); | |
| 53 old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port); | |
| 54 } | |
| 55 | 49 |
| 56 is_open_[port] = false; | 50 endpoints_[port].reset(); |
| 57 STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port. | |
| 58 | |
| 59 // Notify the other (destination) port if its state has changed. | |
| 60 if (dest_is_open) { | |
| 61 MojoWaitFlags new_dest_satisfied_flags = | |
| 62 SatisfiedFlagsNoLock(destination_port); | |
| 63 MojoWaitFlags new_dest_satisfiable_flags = | |
| 64 SatisfiableFlagsNoLock(destination_port); | |
| 65 if (new_dest_satisfied_flags != old_dest_satisfied_flags || | |
| 66 new_dest_satisfiable_flags != old_dest_satisfiable_flags) { | |
| 67 waiter_lists_[destination_port].AwakeWaitersForStateChange( | |
| 68 new_dest_satisfied_flags, new_dest_satisfiable_flags); | |
| 69 } | |
| 70 } | |
| 71 } | 51 } |
| 72 | 52 |
| 73 // TODO(vtl): Handle flags. | 53 // TODO(vtl): Handle flags. |
| 74 MojoResult MessagePipe::WriteMessage( | 54 MojoResult MessagePipe::WriteMessage( |
| 75 unsigned port, | 55 unsigned port, |
| 76 const void* bytes, uint32_t num_bytes, | 56 const void* bytes, uint32_t num_bytes, |
| 77 const MojoHandle* handles, uint32_t num_handles, | 57 const MojoHandle* handles, uint32_t num_handles, |
| 78 MojoWriteMessageFlags /*flags*/) { | 58 MojoWriteMessageFlags flags) { |
| 79 DCHECK(port == 0 || port == 1); | 59 DCHECK(port == 0 || port == 1); |
| 80 | 60 |
| 81 unsigned destination_port = DestinationPortFromSourcePort(port); | 61 unsigned destination_port = DestinationPortFromSourcePort(port); |
| 82 | 62 |
| 83 base::AutoLock locker(lock_); | 63 base::AutoLock locker(lock_); |
| 84 DCHECK(is_open_[port]); | 64 DCHECK(endpoints_[port].get()); |
| 85 | 65 |
| 86 // The destination port need not be open, unlike the source port. | 66 // The destination port need not be open, unlike the source port. |
| 87 if (!is_open_[destination_port]) | 67 if (!endpoints_[destination_port].get()) |
| 88 return MOJO_RESULT_FAILED_PRECONDITION; | 68 return MOJO_RESULT_FAILED_PRECONDITION; |
| 89 | 69 |
| 90 bool dest_was_empty = message_queues_[destination_port].empty(); | 70 return endpoints_[destination_port]->EnqueueMessage(bytes, num_bytes, |
| 91 | 71 handles, num_handles, |
| 92 // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|. | 72 flags); |
| 93 message_queues_[destination_port].push_back( | |
| 94 MessageInTransit::Create(bytes, num_bytes)); | |
| 95 // TODO(vtl): Support sending handles. | |
| 96 | |
| 97 // The other (destination) port was empty and now isn't, so it should now be | |
| 98 // readable. Wake up anyone waiting on this. | |
| 99 if (dest_was_empty) { | |
| 100 waiter_lists_[destination_port].AwakeWaitersForStateChange( | |
| 101 SatisfiedFlagsNoLock(destination_port), | |
| 102 SatisfiableFlagsNoLock(destination_port)); | |
| 103 } | |
| 104 | |
| 105 return MOJO_RESULT_OK; | |
| 106 } | 73 } |
| 107 | 74 |
| 108 MojoResult MessagePipe::ReadMessage(unsigned port, | 75 MojoResult MessagePipe::ReadMessage(unsigned port, |
| 109 void* bytes, uint32_t* num_bytes, | 76 void* bytes, uint32_t* num_bytes, |
| 110 MojoHandle* handles, uint32_t* num_handles, | 77 MojoHandle* handles, uint32_t* num_handles, |
| 111 MojoReadMessageFlags flags) { | 78 MojoReadMessageFlags flags) { |
| 112 DCHECK(port == 0 || port == 1); | 79 DCHECK(port == 0 || port == 1); |
| 113 | 80 |
| 114 const uint32_t max_bytes = num_bytes ? *num_bytes : 0; | 81 base::AutoLock locker(lock_); |
| 115 // TODO(vtl): We'll need this later: | 82 DCHECK(endpoints_[port].get()); |
| 116 // const uint32_t max_handles = num_handles ? *num_handles : 0; | |
| 117 | 83 |
| 118 base::AutoLock locker(lock_); | 84 return endpoints_[port]->ReadMessage(bytes, num_bytes, |
| 119 DCHECK(is_open_[port]); | 85 handles, num_handles, |
| 120 | 86 flags); |
| 121 if (message_queues_[port].empty()) | |
| 122 return MOJO_RESULT_NOT_FOUND; | |
| 123 | |
| 124 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop | |
| 125 // and release the lock immediately. | |
| 126 bool not_enough_space = false; | |
| 127 MessageInTransit* const message = message_queues_[port].front(); | |
| 128 if (num_bytes) | |
| 129 *num_bytes = message->data_size(); | |
| 130 if (message->data_size() <= max_bytes) | |
| 131 memcpy(bytes, message->data(), message->data_size()); | |
| 132 else | |
| 133 not_enough_space = true; | |
| 134 | |
| 135 // TODO(vtl): Support receiving handles. | |
| 136 if (num_handles) | |
| 137 *num_handles = 0; | |
| 138 | |
| 139 if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { | |
| 140 message_queues_[port].pop_front(); | |
| 141 message->Destroy(); | |
| 142 | |
| 143 // Now it's empty, thus no longer readable. | |
| 144 if (message_queues_[port].empty()) { | |
| 145 // It's currently not possible to wait for non-readability, but we should | |
| 146 // do the state change anyway. | |
| 147 waiter_lists_[port].AwakeWaitersForStateChange( | |
| 148 SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port)); | |
| 149 } | |
| 150 } | |
| 151 | |
| 152 if (not_enough_space) | |
| 153 return MOJO_RESULT_RESOURCE_EXHAUSTED; | |
| 154 | |
| 155 return MOJO_RESULT_OK; | |
| 156 } | 87 } |
| 157 | 88 |
| 158 MojoResult MessagePipe::AddWaiter(unsigned port, | 89 MojoResult MessagePipe::AddWaiter(unsigned port, |
| 159 Waiter* waiter, | 90 Waiter* waiter, |
| 160 MojoWaitFlags flags, | 91 MojoWaitFlags flags, |
| 161 MojoResult wake_result) { | 92 MojoResult wake_result) { |
| 162 DCHECK(port == 0 || port == 1); | 93 DCHECK(port == 0 || port == 1); |
| 163 | 94 |
| 164 base::AutoLock locker(lock_); | 95 base::AutoLock locker(lock_); |
| 165 DCHECK(is_open_[port]); | 96 DCHECK(endpoints_[port].get()); |
| 166 | 97 |
| 167 if ((flags & SatisfiedFlagsNoLock(port))) | 98 return endpoints_[port]->AddWaiter(waiter, flags, wake_result); |
| 168 return MOJO_RESULT_ALREADY_EXISTS; | |
| 169 if (!(flags & SatisfiableFlagsNoLock(port))) | |
| 170 return MOJO_RESULT_FAILED_PRECONDITION; | |
| 171 | |
| 172 waiter_lists_[port].AddWaiter(waiter, flags, wake_result); | |
| 173 return MOJO_RESULT_OK; | |
| 174 } | 99 } |
| 175 | 100 |
| 176 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { | 101 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { |
| 177 DCHECK(port == 0 || port == 1); | 102 DCHECK(port == 0 || port == 1); |
| 178 | 103 |
| 179 base::AutoLock locker(lock_); | 104 base::AutoLock locker(lock_); |
| 180 DCHECK(is_open_[port]); | 105 DCHECK(endpoints_[port].get()); |
| 181 | 106 |
| 182 waiter_lists_[port].RemoveWaiter(waiter); | 107 endpoints_[port]->RemoveWaiter(waiter); |
| 183 } | 108 } |
| 184 | 109 |
| 185 MessagePipe::~MessagePipe() { | 110 MessagePipe::~MessagePipe() { |
| 186 // Owned by the dispatchers. The owning dispatchers should only release us via | 111 // Owned by the dispatchers. The owning dispatchers should only release us via |
| 187 // their |Close()| method, which should inform us of being closed via our | 112 // their |Close()| method, which should inform us of being closed via our |
| 188 // |Close()|. Thus these should already be null. | 113 // |Close()|. Thus these should already be null. |
| 189 DCHECK(!is_open_[0]); | 114 DCHECK(!endpoints_[0].get()); |
| 190 DCHECK(!is_open_[1]); | 115 DCHECK(!endpoints_[1].get()); |
| 191 } | |
| 192 | |
| 193 MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) { | |
| 194 DCHECK(port == 0 || port == 1); | |
| 195 | |
| 196 unsigned destination_port = DestinationPortFromSourcePort(port); | |
| 197 | |
| 198 lock_.AssertAcquired(); | |
| 199 | |
| 200 MojoWaitFlags satisfied_flags = 0; | |
| 201 if (!message_queues_[port].empty()) | |
| 202 satisfied_flags |= MOJO_WAIT_FLAG_READABLE; | |
| 203 if (is_open_[destination_port]) | |
| 204 satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE; | |
| 205 | |
| 206 return satisfied_flags; | |
| 207 } | |
| 208 | |
| 209 MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) { | |
| 210 DCHECK(port == 0 || port == 1); | |
| 211 | |
| 212 unsigned destination_port = DestinationPortFromSourcePort(port); | |
| 213 | |
| 214 lock_.AssertAcquired(); | |
| 215 | |
| 216 MojoWaitFlags satisfiable_flags = 0; | |
| 217 if (!message_queues_[port].empty() || is_open_[destination_port]) | |
| 218 satisfiable_flags |= MOJO_WAIT_FLAG_READABLE; | |
| 219 if (is_open_[destination_port]) | |
| 220 satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE; | |
| 221 | |
| 222 return satisfiable_flags; | |
| 223 } | 116 } |
| 224 | 117 |
| 225 } // namespace system | 118 } // namespace system |
| 226 } // namespace mojo | 119 } // namespace mojo |
| OLD | NEW |