| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/ports/node.h" | 5 #include "mojo/edk/system/ports/node.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| (...skipping 245 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 256 | 256 |
| 257 if (port->state != Port::kReceiving) | 257 if (port->state != Port::kReceiving) |
| 258 return ERROR_PORT_STATE_UNEXPECTED; | 258 return ERROR_PORT_STATE_UNEXPECTED; |
| 259 | 259 |
| 260 port_status->has_messages = port->message_queue.HasNextMessage(); | 260 port_status->has_messages = port->message_queue.HasNextMessage(); |
| 261 port_status->receiving_messages = CanAcceptMoreMessages(port); | 261 port_status->receiving_messages = CanAcceptMoreMessages(port); |
| 262 port_status->peer_closed = port->peer_closed; | 262 port_status->peer_closed = port->peer_closed; |
| 263 return OK; | 263 return OK; |
| 264 } | 264 } |
| 265 | 265 |
| 266 int Node::GetMessage(const PortRef& port_ref, ScopedMessage* message) { | 266 int Node::GetMessage(const PortRef& port_ref, |
| 267 return GetMessageIf(port_ref, nullptr, message); | 267 ScopedMessage* message, |
| 268 } | 268 MessageFilter* filter) { |
| 269 | |
| 270 int Node::GetMessageIf(const PortRef& port_ref, | |
| 271 std::function<bool(const Message&)> selector, | |
| 272 ScopedMessage* message) { | |
| 273 *message = nullptr; | 269 *message = nullptr; |
| 274 | 270 |
| 275 DVLOG(4) << "GetMessageIf for " << port_ref.name() << "@" << name_; | 271 DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_; |
| 276 | 272 |
| 277 Port* port = port_ref.port(); | 273 Port* port = port_ref.port(); |
| 278 { | 274 { |
| 279 base::AutoLock lock(port->lock); | 275 base::AutoLock lock(port->lock); |
| 280 | 276 |
| 281 // This could also be treated like the port being unknown since the | 277 // This could also be treated like the port being unknown since the |
| 282 // embedder should no longer be referring to a port that has been sent. | 278 // embedder should no longer be referring to a port that has been sent. |
| 283 if (port->state != Port::kReceiving) | 279 if (port->state != Port::kReceiving) |
| 284 return ERROR_PORT_STATE_UNEXPECTED; | 280 return ERROR_PORT_STATE_UNEXPECTED; |
| 285 | 281 |
| 286 // Let the embedder get messages until there are no more before reporting | 282 // Let the embedder get messages until there are no more before reporting |
| 287 // that the peer closed its end. | 283 // that the peer closed its end. |
| 288 if (!CanAcceptMoreMessages(port)) | 284 if (!CanAcceptMoreMessages(port)) |
| 289 return ERROR_PORT_PEER_CLOSED; | 285 return ERROR_PORT_PEER_CLOSED; |
| 290 | 286 |
| 291 port->message_queue.GetNextMessageIf(std::move(selector), message); | 287 port->message_queue.GetNextMessage(message, filter); |
| 292 } | 288 } |
| 293 | 289 |
| 294 // Allow referenced ports to trigger PortStatusChanged calls. | 290 // Allow referenced ports to trigger PortStatusChanged calls. |
| 295 if (*message) { | 291 if (*message) { |
| 296 for (size_t i = 0; i < (*message)->num_ports(); ++i) { | 292 for (size_t i = 0; i < (*message)->num_ports(); ++i) { |
| 297 const PortName& new_port_name = (*message)->ports()[i]; | 293 const PortName& new_port_name = (*message)->ports()[i]; |
| 298 scoped_refptr<Port> new_port = GetPort(new_port_name); | 294 scoped_refptr<Port> new_port = GetPort(new_port_name); |
| 299 | 295 |
| 300 DCHECK(new_port) << "Port " << new_port_name << "@" << name_ | 296 DCHECK(new_port) << "Port " << new_port_name << "@" << name_ |
| 301 << " does not exist!"; | 297 << " does not exist!"; |
| (...skipping 685 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 987 port->peer_port_name = port_descriptor.peer_port_name; | 983 port->peer_port_name = port_descriptor.peer_port_name; |
| 988 port->last_sequence_num_to_receive = | 984 port->last_sequence_num_to_receive = |
| 989 port_descriptor.last_sequence_num_to_receive; | 985 port_descriptor.last_sequence_num_to_receive; |
| 990 port->peer_closed = port_descriptor.peer_closed; | 986 port->peer_closed = port_descriptor.peer_closed; |
| 991 | 987 |
| 992 DVLOG(2) << "Accepting port " << port_name << " [peer_closed=" | 988 DVLOG(2) << "Accepting port " << port_name << " [peer_closed=" |
| 993 << port->peer_closed << "; last_sequence_num_to_receive=" | 989 << port->peer_closed << "; last_sequence_num_to_receive=" |
| 994 << port->last_sequence_num_to_receive << "]"; | 990 << port->last_sequence_num_to_receive << "]"; |
| 995 | 991 |
| 996 // A newly accepted port is not signalable until the message referencing the | 992 // A newly accepted port is not signalable until the message referencing the |
| 997 // new port finds its way to the consumer (see GetMessageIf). | 993 // new port finds its way to the consumer (see GetMessage). |
| 998 port->message_queue.set_signalable(false); | 994 port->message_queue.set_signalable(false); |
| 999 | 995 |
| 1000 int rv = AddPortWithName(port_name, port); | 996 int rv = AddPortWithName(port_name, port); |
| 1001 if (rv != OK) | 997 if (rv != OK) |
| 1002 return rv; | 998 return rv; |
| 1003 | 999 |
| 1004 // Allow referring port to forward messages. | 1000 // Allow referring port to forward messages. |
| 1005 delegate_->ForwardMessage( | 1001 delegate_->ForwardMessage( |
| 1006 port_descriptor.referring_node_name, | 1002 port_descriptor.referring_node_name, |
| 1007 NewInternalMessage(port_descriptor.referring_port_name, | 1003 NewInternalMessage(port_descriptor.referring_port_name, |
| (...skipping 161 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1169 return OK; | 1165 return OK; |
| 1170 } | 1166 } |
| 1171 | 1167 |
| 1172 int Node::ForwardMessages_Locked(const LockedPort& port, | 1168 int Node::ForwardMessages_Locked(const LockedPort& port, |
| 1173 const PortName &port_name) { | 1169 const PortName &port_name) { |
| 1174 ports_lock_.AssertAcquired(); | 1170 ports_lock_.AssertAcquired(); |
| 1175 port->lock.AssertAcquired(); | 1171 port->lock.AssertAcquired(); |
| 1176 | 1172 |
| 1177 for (;;) { | 1173 for (;;) { |
| 1178 ScopedMessage message; | 1174 ScopedMessage message; |
| 1179 port->message_queue.GetNextMessageIf(nullptr, &message); | 1175 port->message_queue.GetNextMessage(&message, nullptr); |
| 1180 if (!message) | 1176 if (!message) |
| 1181 break; | 1177 break; |
| 1182 | 1178 |
| 1183 int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get()); | 1179 int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get()); |
| 1184 if (rv != OK) | 1180 if (rv != OK) |
| 1185 return rv; | 1181 return rv; |
| 1186 | 1182 |
| 1187 delegate_->ForwardMessage(port->peer_node_name, std::move(message)); | 1183 delegate_->ForwardMessage(port->peer_node_name, std::move(message)); |
| 1188 } | 1184 } |
| 1189 return OK; | 1185 return OK; |
| (...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1376 | 1372 |
| 1377 if (num_data_bytes) | 1373 if (num_data_bytes) |
| 1378 memcpy(header + 1, data, num_data_bytes); | 1374 memcpy(header + 1, data, num_data_bytes); |
| 1379 | 1375 |
| 1380 return message; | 1376 return message; |
| 1381 } | 1377 } |
| 1382 | 1378 |
| 1383 } // namespace ports | 1379 } // namespace ports |
| 1384 } // namespace edk | 1380 } // namespace edk |
| 1385 } // namespace mojo | 1381 } // namespace mojo |
| OLD | NEW |