OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/ports/node.h" | 5 #include "mojo/edk/system/ports/node.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
(...skipping 245 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
256 | 256 |
257 if (port->state != Port::kReceiving) | 257 if (port->state != Port::kReceiving) |
258 return ERROR_PORT_STATE_UNEXPECTED; | 258 return ERROR_PORT_STATE_UNEXPECTED; |
259 | 259 |
260 port_status->has_messages = port->message_queue.HasNextMessage(); | 260 port_status->has_messages = port->message_queue.HasNextMessage(); |
261 port_status->receiving_messages = CanAcceptMoreMessages(port); | 261 port_status->receiving_messages = CanAcceptMoreMessages(port); |
262 port_status->peer_closed = port->peer_closed; | 262 port_status->peer_closed = port->peer_closed; |
263 return OK; | 263 return OK; |
264 } | 264 } |
265 | 265 |
266 int Node::GetMessage(const PortRef& port_ref, ScopedMessage* message) { | 266 int Node::GetMessage(const PortRef& port_ref, |
267 return GetMessageIf(port_ref, nullptr, message); | 267 ScopedMessage* message, |
268 } | 268 MessageFilter* filter) { |
269 | |
270 int Node::GetMessageIf(const PortRef& port_ref, | |
271 std::function<bool(const Message&)> selector, | |
272 ScopedMessage* message) { | |
273 *message = nullptr; | 269 *message = nullptr; |
274 | 270 |
275 DVLOG(4) << "GetMessageIf for " << port_ref.name() << "@" << name_; | 271 DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_; |
276 | 272 |
277 Port* port = port_ref.port(); | 273 Port* port = port_ref.port(); |
278 { | 274 { |
279 base::AutoLock lock(port->lock); | 275 base::AutoLock lock(port->lock); |
280 | 276 |
281 // This could also be treated like the port being unknown since the | 277 // This could also be treated like the port being unknown since the |
282 // embedder should no longer be referring to a port that has been sent. | 278 // embedder should no longer be referring to a port that has been sent. |
283 if (port->state != Port::kReceiving) | 279 if (port->state != Port::kReceiving) |
284 return ERROR_PORT_STATE_UNEXPECTED; | 280 return ERROR_PORT_STATE_UNEXPECTED; |
285 | 281 |
286 // Let the embedder get messages until there are no more before reporting | 282 // Let the embedder get messages until there are no more before reporting |
287 // that the peer closed its end. | 283 // that the peer closed its end. |
288 if (!CanAcceptMoreMessages(port)) | 284 if (!CanAcceptMoreMessages(port)) |
289 return ERROR_PORT_PEER_CLOSED; | 285 return ERROR_PORT_PEER_CLOSED; |
290 | 286 |
291 port->message_queue.GetNextMessageIf(std::move(selector), message); | 287 port->message_queue.GetNextMessage(message, filter); |
292 } | 288 } |
293 | 289 |
294 // Allow referenced ports to trigger PortStatusChanged calls. | 290 // Allow referenced ports to trigger PortStatusChanged calls. |
295 if (*message) { | 291 if (*message) { |
296 for (size_t i = 0; i < (*message)->num_ports(); ++i) { | 292 for (size_t i = 0; i < (*message)->num_ports(); ++i) { |
297 const PortName& new_port_name = (*message)->ports()[i]; | 293 const PortName& new_port_name = (*message)->ports()[i]; |
298 scoped_refptr<Port> new_port = GetPort(new_port_name); | 294 scoped_refptr<Port> new_port = GetPort(new_port_name); |
299 | 295 |
300 DCHECK(new_port) << "Port " << new_port_name << "@" << name_ | 296 DCHECK(new_port) << "Port " << new_port_name << "@" << name_ |
301 << " does not exist!"; | 297 << " does not exist!"; |
(...skipping 685 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
987 port->peer_port_name = port_descriptor.peer_port_name; | 983 port->peer_port_name = port_descriptor.peer_port_name; |
988 port->last_sequence_num_to_receive = | 984 port->last_sequence_num_to_receive = |
989 port_descriptor.last_sequence_num_to_receive; | 985 port_descriptor.last_sequence_num_to_receive; |
990 port->peer_closed = port_descriptor.peer_closed; | 986 port->peer_closed = port_descriptor.peer_closed; |
991 | 987 |
992 DVLOG(2) << "Accepting port " << port_name << " [peer_closed=" | 988 DVLOG(2) << "Accepting port " << port_name << " [peer_closed=" |
993 << port->peer_closed << "; last_sequence_num_to_receive=" | 989 << port->peer_closed << "; last_sequence_num_to_receive=" |
994 << port->last_sequence_num_to_receive << "]"; | 990 << port->last_sequence_num_to_receive << "]"; |
995 | 991 |
996 // A newly accepted port is not signalable until the message referencing the | 992 // A newly accepted port is not signalable until the message referencing the |
997 // new port finds its way to the consumer (see GetMessageIf). | 993 // new port finds its way to the consumer (see GetMessage). |
998 port->message_queue.set_signalable(false); | 994 port->message_queue.set_signalable(false); |
999 | 995 |
1000 int rv = AddPortWithName(port_name, port); | 996 int rv = AddPortWithName(port_name, port); |
1001 if (rv != OK) | 997 if (rv != OK) |
1002 return rv; | 998 return rv; |
1003 | 999 |
1004 // Allow referring port to forward messages. | 1000 // Allow referring port to forward messages. |
1005 delegate_->ForwardMessage( | 1001 delegate_->ForwardMessage( |
1006 port_descriptor.referring_node_name, | 1002 port_descriptor.referring_node_name, |
1007 NewInternalMessage(port_descriptor.referring_port_name, | 1003 NewInternalMessage(port_descriptor.referring_port_name, |
(...skipping 161 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1169 return OK; | 1165 return OK; |
1170 } | 1166 } |
1171 | 1167 |
1172 int Node::ForwardMessages_Locked(const LockedPort& port, | 1168 int Node::ForwardMessages_Locked(const LockedPort& port, |
1173 const PortName &port_name) { | 1169 const PortName &port_name) { |
1174 ports_lock_.AssertAcquired(); | 1170 ports_lock_.AssertAcquired(); |
1175 port->lock.AssertAcquired(); | 1171 port->lock.AssertAcquired(); |
1176 | 1172 |
1177 for (;;) { | 1173 for (;;) { |
1178 ScopedMessage message; | 1174 ScopedMessage message; |
1179 port->message_queue.GetNextMessageIf(nullptr, &message); | 1175 port->message_queue.GetNextMessage(&message, nullptr); |
1180 if (!message) | 1176 if (!message) |
1181 break; | 1177 break; |
1182 | 1178 |
1183 int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get()); | 1179 int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get()); |
1184 if (rv != OK) | 1180 if (rv != OK) |
1185 return rv; | 1181 return rv; |
1186 | 1182 |
1187 delegate_->ForwardMessage(port->peer_node_name, std::move(message)); | 1183 delegate_->ForwardMessage(port->peer_node_name, std::move(message)); |
1188 } | 1184 } |
1189 return OK; | 1185 return OK; |
(...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1376 | 1372 |
1377 if (num_data_bytes) | 1373 if (num_data_bytes) |
1378 memcpy(header + 1, data, num_data_bytes); | 1374 memcpy(header + 1, data, num_data_bytes); |
1379 | 1375 |
1380 return message; | 1376 return message; |
1381 } | 1377 } |
1382 | 1378 |
1383 } // namespace ports | 1379 } // namespace ports |
1384 } // namespace edk | 1380 } // namespace edk |
1385 } // namespace mojo | 1381 } // namespace mojo |
OLD | NEW |