Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(66)

Side by Side Diff: mojo/edk/system/ports/node.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/ports/node.h"
6
7 #include <string.h>
8
9 #include "base/logging.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/synchronization/lock.h"
12 #include "mojo/edk/system/ports/node_delegate.h"
13
14 namespace mojo {
15 namespace edk {
16 namespace ports {
17
18 namespace {
19
20 int DebugError(const char* message, int error_code) {
21 CHECK(false) << "Oops: " << message;
22 return error_code;
23 }
24
25 #define OOPS(x) DebugError(#x, x)
26
27 bool CanAcceptMoreMessages(const Port* port) {
28 // Have we already doled out the last message (i.e., do we expect to NOT
29 // receive further messages)?
30 uint64_t next_sequence_num = port->message_queue.next_sequence_num();
31 if (port->peer_closed || port->remove_proxy_on_last_message) {
32 if (port->last_sequence_num_to_receive == next_sequence_num - 1)
33 return false;
34 }
35 return true;
36 }
37
38 } // namespace
39
40 Node::Node(const NodeName& name, NodeDelegate* delegate)
41 : name_(name),
42 delegate_(delegate) {
43 }
44
45 Node::~Node() {
46 if (!ports_.empty())
47 DLOG(WARNING) << "Unclean shutdown for node " << name_;
48 }
49
50 bool Node::CanShutdownCleanly(bool allow_local_ports) {
51 base::AutoLock ports_lock(ports_lock_);
52
53 if (!allow_local_ports) {
54 #if !defined(NDEBUG)
55 for (auto entry : ports_) {
56 DVLOG(2) << "Port " << entry.first << " referencing node "
57 << entry.second->peer_node_name << " is blocking shutdown of "
58 << "node " << name_ << " (state=" << entry.second->state << ")";
59 }
60 #endif
61 return ports_.empty();
62 }
63
64 // NOTE: This is not efficient, though it probably doesn't need to be since
65 // relatively few ports should be open during shutdown and shutdown doesn't
66 // need to be blazingly fast.
67 bool can_shutdown = true;
68 for (auto entry : ports_) {
69 base::AutoLock lock(entry.second->lock);
70 if (entry.second->peer_node_name != name_ &&
71 entry.second->state != Port::kReceiving) {
72 can_shutdown = false;
73 #if !defined(NDEBUG)
74 DVLOG(2) << "Port " << entry.first << " referencing node "
75 << entry.second->peer_node_name << " is blocking shutdown of "
76 << "node " << name_ << " (state=" << entry.second->state << ")";
77 #else
78 // Exit early when not debugging.
79 break;
80 #endif
81 }
82 }
83
84 return can_shutdown;
85 }
86
87 int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
88 scoped_refptr<Port> port = GetPort(port_name);
89 if (!port)
90 return ERROR_PORT_UNKNOWN;
91
92 *port_ref = PortRef(port_name, std::move(port));
93 return OK;
94 }
95
96 int Node::CreateUninitializedPort(PortRef* port_ref) {
97 PortName port_name;
98 delegate_->GenerateRandomPortName(&port_name);
99
100 scoped_refptr<Port> port = make_scoped_refptr(new Port(kInitialSequenceNum,
101 kInitialSequenceNum));
102 int rv = AddPortWithName(port_name, port);
103 if (rv != OK)
104 return rv;
105
106 *port_ref = PortRef(port_name, std::move(port));
107 return OK;
108 }
109
110 int Node::InitializePort(const PortRef& port_ref,
111 const NodeName& peer_node_name,
112 const PortName& peer_port_name) {
113 Port* port = port_ref.port();
114
115 {
116 base::AutoLock lock(port->lock);
117 if (port->state != Port::kUninitialized)
118 return ERROR_PORT_STATE_UNEXPECTED;
119
120 port->state = Port::kReceiving;
121 port->peer_node_name = peer_node_name;
122 port->peer_port_name = peer_port_name;
123 }
124
125 delegate_->PortStatusChanged(port_ref);
126
127 return OK;
128 }
129
130 int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
131 int rv;
132
133 rv = CreateUninitializedPort(port0_ref);
134 if (rv != OK)
135 return rv;
136
137 rv = CreateUninitializedPort(port1_ref);
138 if (rv != OK)
139 return rv;
140
141 rv = InitializePort(*port0_ref, name_, port1_ref->name());
142 if (rv != OK)
143 return rv;
144
145 rv = InitializePort(*port1_ref, name_, port0_ref->name());
146 if (rv != OK)
147 return rv;
148
149 return OK;
150 }
151
152 int Node::SetUserData(const PortRef& port_ref,
153 const scoped_refptr<UserData>& user_data) {
154 Port* port = port_ref.port();
155
156 base::AutoLock lock(port->lock);
157 if (port->state == Port::kClosed)
158 return ERROR_PORT_STATE_UNEXPECTED;
159
160 port->user_data = std::move(user_data);
161
162 return OK;
163 }
164
165 int Node::GetUserData(const PortRef& port_ref,
166 scoped_refptr<UserData>* user_data) {
167 Port* port = port_ref.port();
168
169 base::AutoLock lock(port->lock);
170 if (port->state == Port::kClosed)
171 return ERROR_PORT_STATE_UNEXPECTED;
172
173 *user_data = port->user_data;
174
175 return OK;
176 }
177
178 int Node::ClosePort(const PortRef& port_ref) {
179 std::deque<PortName> referenced_port_names;
180
181 ObserveClosureEventData data;
182
183 NodeName peer_node_name;
184 PortName peer_port_name;
185 Port* port = port_ref.port();
186 {
187 // We may need to erase the port, which requires ports_lock_ to be held,
188 // but ports_lock_ must be acquired before any individual port locks.
189 base::AutoLock ports_lock(ports_lock_);
190
191 base::AutoLock lock(port->lock);
192 if (port->state == Port::kUninitialized) {
193 // If the port was not yet initialized, there's nothing interesting to do.
194 ErasePort_Locked(port_ref.name());
195 return OK;
196 }
197
198 if (port->state != Port::kReceiving)
199 return ERROR_PORT_STATE_UNEXPECTED;
200
201 port->state = Port::kClosed;
202
203 // We pass along the sequence number of the last message sent from this
204 // port to allow the peer to have the opportunity to consume all inbound
205 // messages before notifying the embedder that this port is closed.
206 data.last_sequence_num = port->next_sequence_num_to_send - 1;
207
208 peer_node_name = port->peer_node_name;
209 peer_port_name = port->peer_port_name;
210
211 // If the port being closed still has unread messages, then we need to take
212 // care to close those ports so as to avoid leaking memory.
213 port->message_queue.GetReferencedPorts(&referenced_port_names);
214 }
215
216 DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" << name_
217 << " to " << peer_port_name << "@" << peer_node_name;
218
219 ErasePort(port_ref.name());
220
221 delegate_->ForwardMessage(
222 peer_node_name,
223 NewInternalMessage(peer_port_name, EventType::kObserveClosure, data));
224
225 for (const auto& name : referenced_port_names) {
226 PortRef ref;
227 if (GetPort(name, &ref) == OK)
228 ClosePort(ref);
229 }
230 return OK;
231 }
232
233 int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
234 Port* port = port_ref.port();
235
236 base::AutoLock lock(port->lock);
237
238 if (port->state != Port::kReceiving)
239 return ERROR_PORT_STATE_UNEXPECTED;
240
241 port_status->has_messages = port->message_queue.HasNextMessage();
242 port_status->receiving_messages = CanAcceptMoreMessages(port);
243 port_status->peer_closed = port->peer_closed;
244 return OK;
245 }
246
247 int Node::GetMessage(const PortRef& port_ref, ScopedMessage* message) {
248 return GetMessageIf(port_ref, nullptr, message);
249 }
250
251 int Node::GetMessageIf(const PortRef& port_ref,
252 std::function<bool(const Message&)> selector,
253 ScopedMessage* message) {
254 *message = nullptr;
255
256 DVLOG(2) << "GetMessageIf for " << port_ref.name() << "@" << name_;
257
258 Port* port = port_ref.port();
259 {
260 base::AutoLock lock(port->lock);
261
262 // This could also be treated like the port being unknown since the
263 // embedder should no longer be referring to a port that has been sent.
264 if (port->state != Port::kReceiving)
265 return ERROR_PORT_STATE_UNEXPECTED;
266
267 // Let the embedder get messages until there are no more before reporting
268 // that the peer closed its end.
269 if (!CanAcceptMoreMessages(port))
270 return ERROR_PORT_PEER_CLOSED;
271
272 port->message_queue.GetNextMessageIf(selector, message);
273 }
274
275 // Allow referenced ports to trigger PortStatusChanged calls.
276 if (*message) {
277 for (size_t i = 0; i < (*message)->num_ports(); ++i) {
278 const PortName& new_port_name = (*message)->ports()[i];
279 scoped_refptr<Port> new_port = GetPort(new_port_name);
280
281 DCHECK(new_port) << "Port " << new_port_name << "@" << name_
282 << " does not exist!";
283
284 base::AutoLock lock(new_port->lock);
285
286 DCHECK(new_port->state == Port::kReceiving);
287 new_port->message_queue.set_signalable(true);
288 }
289 }
290
291 return OK;
292 }
293
294 int Node::SendMessage(const PortRef& port_ref, ScopedMessage* message) {
295 ScopedMessage& m = *message;
296 for (size_t i = 0; i < m->num_ports(); ++i) {
297 if (m->ports()[i] == port_ref.name())
298 return ERROR_PORT_CANNOT_SEND_SELF;
299 }
300
301 Port* port = port_ref.port();
302 {
303 // We must acquire |ports_lock_| before grabbing any port locks, because
304 // WillSendMessage_Locked may need to lock multiple ports out of order.
305 base::AutoLock ports_lock(ports_lock_);
306 base::AutoLock lock(port->lock);
307
308 if (port->state != Port::kReceiving)
309 return ERROR_PORT_STATE_UNEXPECTED;
310
311 if (port->peer_closed)
312 return ERROR_PORT_PEER_CLOSED;
313
314 int rv = WillSendMessage_Locked(port, port_ref.name(), m.get());
315 if (rv != OK)
316 return rv;
317
318 // Beyond this point there's no sense in returning anything but OK. Even if
319 // message forwarding or acceptance fails, there's nothing the embedder can
320 // do to recover. Assume that failure beyond this point must be treated as a
321 // transport failure.
322
323 if (port->peer_node_name != name_) {
324 delegate_->ForwardMessage(port->peer_node_name, std::move(m));
325 return OK;
326 }
327 }
328
329 int rv = AcceptMessage(std::move(m));
330 if (rv != OK) {
331 // See comment above for why we don't return an error in this case.
332 DVLOG(2) << "AcceptMessage failed: " << rv;
333 }
334
335 return OK;
336 }
337
338 int Node::AcceptMessage(ScopedMessage message) {
339 const EventHeader* header = GetEventHeader(*message);
340 switch (header->type) {
341 case EventType::kUser:
342 return OnUserMessage(std::move(message));
343 case EventType::kPortAccepted:
344 return OnPortAccepted(header->port_name);
345 case EventType::kObserveProxy:
346 return OnObserveProxy(
347 header->port_name,
348 *GetEventData<ObserveProxyEventData>(*message));
349 case EventType::kObserveProxyAck:
350 return OnObserveProxyAck(
351 header->port_name,
352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num);
353 case EventType::kObserveClosure:
354 return OnObserveClosure(
355 header->port_name,
356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
357 }
358 return OOPS(ERROR_NOT_IMPLEMENTED);
359 }
360
361 int Node::LostConnectionToNode(const NodeName& node_name) {
362 // We can no longer send events to the given node. We also can't expect any
363 // PortAccepted events.
364
365 DVLOG(1) << "Observing lost connection from node " << name_
366 << " to node " << node_name;
367
368 std::vector<PortRef> ports_to_notify;
369
370 {
371 base::AutoLock ports_lock(ports_lock_);
372
373 for (auto iter = ports_.begin(); iter != ports_.end(); ) {
374 scoped_refptr<Port>& port = iter->second;
375
376 bool remove_port = false;
377 {
378 base::AutoLock port_lock(port->lock);
379
380 if (port->peer_node_name == node_name) {
381 // We can no longer send messages to this port's peer. We assume we
382 // will not receive any more messages from this port's peer as well.
383 if (!port->peer_closed) {
384 port->peer_closed = true;
385 port->last_sequence_num_to_receive =
386 port->message_queue.next_sequence_num() - 1;
387
388 if (port->state == Port::kReceiving)
389 ports_to_notify.push_back(PortRef(iter->first, port));
390 }
391
392 // We do not expect to forward any further messages, and we do not
393 // expect to receive a Port{Accepted,Rejected} event.
394 if (port->state != Port::kReceiving)
395 remove_port = true;
396 }
397 }
398
399 if (remove_port) {
400 DVLOG(2) << "Deleted port " << iter->first << "@" << name_;
401 iter = ports_.erase(iter);
402 } else {
403 ++iter;
404 }
405 }
406 }
407
408 for (size_t i = 0; i < ports_to_notify.size(); ++i)
409 delegate_->PortStatusChanged(ports_to_notify[i]);
410
411 return OK;
412 }
413
414 int Node::OnUserMessage(ScopedMessage message) {
415 PortName port_name = GetEventHeader(*message)->port_name;
416 const auto* event = GetEventData<UserEventData>(*message);
417
418 #if !defined(NDEBUG)
419 std::ostringstream ports_buf;
420 for (size_t i = 0; i < message->num_ports(); ++i) {
421 if (i > 0)
422 ports_buf << ",";
423 ports_buf << message->ports()[i];
424 }
425
426 DVLOG(2) << "AcceptMessage " << event->sequence_num
427 << " [ports=" << ports_buf.str() << "] at "
428 << port_name << "@" << name_;
429 #endif
430
431 scoped_refptr<Port> port = GetPort(port_name);
432
433 // Even if this port does not exist, cannot receive anymore messages or is
434 // buffering or proxying messages, we still need these ports to be bound to
435 // this node. When the message is forwarded, these ports will get transferred
436 // following the usual method. If the message cannot be accepted, then the
437 // newly bound ports will simply be closed.
438
439 for (size_t i = 0; i < message->num_ports(); ++i) {
440 int rv = AcceptPort(message->ports()[i], GetPortDescriptors(event)[i]);
441 if (rv != OK)
442 return rv;
443 }
444
445 bool has_next_message = false;
446 bool message_accepted = false;
447
448 if (port) {
449 // We may want to forward messages once the port lock is held, so we must
450 // acquire |ports_lock_| first.
451 base::AutoLock ports_lock(ports_lock_);
452 base::AutoLock lock(port->lock);
453
454 // Reject spurious messages if we've already received the last expected
455 // message.
456 if (CanAcceptMoreMessages(port.get())) {
457 message_accepted = true;
458 port->message_queue.AcceptMessage(std::move(message), &has_next_message);
459
460 if (port->state == Port::kBuffering) {
461 has_next_message = false;
462 } else if (port->state == Port::kProxying) {
463 has_next_message = false;
464
465 // Forward messages. We forward messages in sequential order here so
466 // that we maintain the message queue's notion of next sequence number.
467 // That's useful for the proxy removal process as we can tell when this
468 // port has seen all of the messages it is expected to see.
469 int rv = ForwardMessages_Locked(port.get(), port_name);
470 if (rv != OK)
471 return rv;
472
473 MaybeRemoveProxy_Locked(port.get(), port_name);
474 }
475 }
476 }
477
478 if (!message_accepted) {
479 DVLOG(2) << "Message not accepted!\n";
480 // Close all newly accepted ports as they are effectively orphaned.
481 for (size_t i = 0; i < message->num_ports(); ++i) {
482 PortRef port_ref;
483 if (GetPort(message->ports()[i], &port_ref) == OK) {
484 ClosePort(port_ref);
485 } else {
486 DLOG(WARNING) << "Cannot close non-existent port!\n";
487 }
488 }
489 } else if (has_next_message) {
490 PortRef port_ref(port_name, port);
491 delegate_->PortStatusChanged(port_ref);
492 }
493
494 return OK;
495 }
496
497 int Node::OnPortAccepted(const PortName& port_name) {
498 scoped_refptr<Port> port = GetPort(port_name);
499 if (!port)
500 return OOPS(ERROR_PORT_UNKNOWN);
501
502 {
503 // We must hold |ports_lock_| before grabbing the port lock because
504 // ForwardMessages_Locked requires it to be held.
505 base::AutoLock ports_lock(ports_lock_);
506 base::AutoLock lock(port->lock);
507
508 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
509 << " pointing to "
510 << port->peer_port_name << "@" << port->peer_node_name;
511
512 if (port->state != Port::kBuffering)
513 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
514
515 port->state = Port::kProxying;
516
517 int rv = ForwardMessages_Locked(port.get(), port_name);
518 if (rv != OK)
519 return rv;
520
521 // We may have observed closure before receiving PortAccepted. In that
522 // case, we can advance to removing the proxy without sending out an
523 // ObserveProxy message. We already know the last expected message, etc.
524
525 if (port->remove_proxy_on_last_message) {
526 MaybeRemoveProxy_Locked(port.get(), port_name);
527
528 // Make sure we propagate closure to our current peer.
529 ObserveClosureEventData data;
530 data.last_sequence_num = port->last_sequence_num_to_receive;
531 delegate_->ForwardMessage(
532 port->peer_node_name,
533 NewInternalMessage(port->peer_port_name,
534 EventType::kObserveClosure, data));
535 } else {
536 InitiateProxyRemoval_Locked(port.get(), port_name);
537 }
538 }
539 return OK;
540 }
541
542 int Node::OnObserveProxy(const PortName& port_name,
543 const ObserveProxyEventData& event) {
544 // The port may have already been closed locally, in which case the
545 // ObserveClosure message will contain the last_sequence_num field.
546 // We can then silently ignore this message.
547 scoped_refptr<Port> port = GetPort(port_name);
548 if (!port) {
549 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
550 return OK;
551 }
552
553 DVLOG(2) << "ObserveProxy at " << port_name << "@" << name_ << ", proxy at "
554 << event.proxy_port_name << "@"
555 << event.proxy_node_name << " pointing to "
556 << event.proxy_to_port_name << "@"
557 << event.proxy_to_node_name;
558
559 {
560 base::AutoLock lock(port->lock);
561
562 if (port->peer_node_name == event.proxy_node_name &&
563 port->peer_port_name == event.proxy_port_name) {
564 if (port->state == Port::kReceiving) {
565 port->peer_node_name = event.proxy_to_node_name;
566 port->peer_port_name = event.proxy_to_port_name;
567
568 ObserveProxyAckEventData ack;
569 ack.last_sequence_num = port->next_sequence_num_to_send - 1;
570
571 delegate_->ForwardMessage(
572 event.proxy_node_name,
573 NewInternalMessage(event.proxy_port_name,
574 EventType::kObserveProxyAck,
575 ack));
576 } else {
577 // As a proxy ourselves, we don't know how to honor the ObserveProxy
578 // event or to populate the last_sequence_num field of ObserveProxyAck.
579 // Afterall, another port could be sending messages to our peer now
580 // that we've sent out our own ObserveProxy event. Instead, we will
581 // send an ObserveProxyAck indicating that the ObserveProxy event
582 // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
583 // However, this has to be done after we are removed as a proxy.
584 // Otherwise, we might just find ourselves back here again, which
585 // would be akin to a busy loop.
586
587 DVLOG(2) << "Delaying ObserveProxyAck to "
588 << event.proxy_port_name << "@" << event.proxy_node_name;
589
590 ObserveProxyAckEventData ack;
591 ack.last_sequence_num = kInvalidSequenceNum;
592
593 port->send_on_proxy_removal.reset(
594 new std::pair<NodeName, ScopedMessage>(
595 event.proxy_node_name,
596 NewInternalMessage(event.proxy_port_name,
597 EventType::kObserveProxyAck,
598 ack)));
599 }
600 } else {
601 // Forward this event along to our peer. Eventually, it should find the
602 // port referring to the proxy.
603 delegate_->ForwardMessage(
604 port->peer_node_name,
605 NewInternalMessage(port->peer_port_name,
606 EventType::kObserveProxy,
607 event));
608 }
609 }
610 return OK;
611 }
612
613 int Node::OnObserveProxyAck(const PortName& port_name,
614 uint64_t last_sequence_num) {
615 DVLOG(2) << "ObserveProxyAck at " << port_name << "@" << name_
616 << " (last_sequence_num=" << last_sequence_num << ")";
617
618 scoped_refptr<Port> port = GetPort(port_name);
619 if (!port)
620 return ERROR_PORT_UNKNOWN; // The port may have observed closure first, so
621 // this is not an "Oops".
622
623 {
624 // We must acquire |ports_lock_| before the port lock because it must be
625 // held for MaybeRemoveProxy_Locked.
626 base::AutoLock ports_lock(ports_lock_);
627
628 base::AutoLock lock(port->lock);
629
630 if (port->state != Port::kProxying)
631 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
632
633 if (last_sequence_num == kInvalidSequenceNum) {
634 // Send again.
635 InitiateProxyRemoval_Locked(port.get(), port_name);
636 return OK;
637 }
638
639 // We can now remove this port once we have received and forwarded the last
640 // message addressed to this port.
641 port->remove_proxy_on_last_message = true;
642 port->last_sequence_num_to_receive = last_sequence_num;
643
644 MaybeRemoveProxy_Locked(port.get(), port_name);
645 }
646 return OK;
647 }
648
649 int Node::OnObserveClosure(const PortName& port_name,
650 uint64_t last_sequence_num) {
651 // OK if the port doesn't exist, as it may have been closed already.
652 scoped_refptr<Port> port = GetPort(port_name);
653 if (!port)
654 return OK;
655
656 // This message tells the port that it should no longer expect more messages
657 // beyond last_sequence_num. This message is forwarded along until we reach
658 // the receiving end, and this message serves as an equivalent to
659 // ObserveProxyAck.
660
661 bool notify_delegate = false;
662 {
663 // We must acquire |ports_lock_| before the port lock because it must be
664 // held for MaybeRemoveProxy_Locked.
665 base::AutoLock ports_lock(ports_lock_);
666
667 base::AutoLock lock(port->lock);
668
669 port->peer_closed = true;
670 port->last_sequence_num_to_receive = last_sequence_num;
671
672 DVLOG(2) << "ObserveClosure at " << port_name << "@" << name_
673 << " (state=" << port->state << ") pointing to "
674 << port->peer_port_name << "@" << port->peer_node_name
675 << " (last_sequence_num=" << last_sequence_num << ")";
676
677 // We always forward ObserveClosure, even beyond the receiving port which
678 // cares about it. This ensures that any dead-end proxies beyond that port
679 // are notified to remove themselves.
680
681 ObserveClosureEventData forwarded_data;
682
683 if (port->state == Port::kReceiving) {
684 notify_delegate = true;
685
686 // When forwarding along the other half of the port cycle, this will only
687 // reach dead-end proxies. Tell them we've sent our last message so they
688 // can go away.
689 //
690 // TODO: Repurposing ObserveClosure for this has the desired result but
691 // may be semantically confusing since the forwarding port is not actually
692 // closed. Consider replacing this with a new event type.
693 forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1;
694 } else {
695 // We haven't yet reached the receiving peer of the closed port, so
696 // forward the message along as-is.
697 forwarded_data.last_sequence_num = last_sequence_num;
698
699 // See about removing the port if it is a proxy as our peer won't be able
700 // to participate in proxy removal.
701 port->remove_proxy_on_last_message = true;
702 if (port->state == Port::kProxying)
703 MaybeRemoveProxy_Locked(port.get(), port_name);
704 }
705
706 DVLOG(2) << "Forwarding ObserveClosure from "
707 << port_name << "@" << name_ << " to peer "
708 << port->peer_port_name << "@" << port->peer_node_name
709 << " (last_sequence_num=" << forwarded_data.last_sequence_num
710 << ")";
711
712 delegate_->ForwardMessage(
713 port->peer_node_name,
714 NewInternalMessage(port->peer_port_name,
715 EventType::kObserveClosure, forwarded_data));
716 }
717 if (notify_delegate) {
718 PortRef port_ref(port_name, port);
719 delegate_->PortStatusChanged(port_ref);
720 }
721 return OK;
722 }
723
724 int Node::AddPortWithName(const PortName& port_name,
725 const scoped_refptr<Port>& port) {
726 base::AutoLock lock(ports_lock_);
727
728 if (!ports_.insert(std::make_pair(port_name, port)).second)
729 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator.
730
731 DVLOG(2) << "Created port " << port_name << "@" << name_;
732 return OK;
733 }
734
735 void Node::ErasePort(const PortName& port_name) {
736 base::AutoLock lock(ports_lock_);
737 return ErasePort_Locked(port_name);
738 }
739
740 void Node::ErasePort_Locked(const PortName& port_name) {
741 ports_lock_.AssertAcquired();
742 ports_.erase(port_name);
743 DVLOG(2) << "Deleted port " << port_name << "@" << name_;
744 }
745
746 scoped_refptr<Port> Node::GetPort(const PortName& port_name) {
747 base::AutoLock lock(ports_lock_);
748 return GetPort_Locked(port_name);
749 }
750
751 scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
752 ports_lock_.AssertAcquired();
753 auto iter = ports_.find(port_name);
754 if (iter == ports_.end())
755 return nullptr;
756
757 return iter->second;
758 }
759
760 void Node::WillSendPort_Locked(Port* port,
761 const NodeName& to_node_name,
762 PortName* port_name,
763 PortDescriptor* port_descriptor) {
764 ports_lock_.AssertAcquired();
765 port->lock.AssertAcquired();
766
767 PortName local_port_name = *port_name;
768
769 PortName new_port_name;
770 delegate_->GenerateRandomPortName(&new_port_name);
771
772 // Make sure we don't send messages to the new peer until after we know it
773 // exists. In the meantime, just buffer messages locally.
774 DCHECK(port->state == Port::kReceiving);
775 port->state = Port::kBuffering;
776
777 // If we already know our peer is closed, we already know this proxy can
778 // be removed once it receives and forwards its last expected message.
779 if (port->peer_closed)
780 port->remove_proxy_on_last_message = true;
781
782 *port_name = new_port_name;
783
784 port_descriptor->peer_node_name = port->peer_node_name;
785 port_descriptor->peer_port_name = port->peer_port_name;
786 port_descriptor->referring_node_name = name_;
787 port_descriptor->referring_port_name = local_port_name;
788 port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
789 port_descriptor->next_sequence_num_to_receive =
790 port->message_queue.next_sequence_num();
791 port_descriptor->last_sequence_num_to_receive =
792 port->last_sequence_num_to_receive;
793 port_descriptor->peer_closed = port->peer_closed;
794
795 // Configure the local port to point to the new port.
796 port->peer_node_name = to_node_name;
797 port->peer_port_name = new_port_name;
798 }
799
800 int Node::AcceptPort(const PortName& port_name,
801 const PortDescriptor& port_descriptor) {
802 scoped_refptr<Port> port = make_scoped_refptr(
803 new Port(port_descriptor.next_sequence_num_to_send,
804 port_descriptor.next_sequence_num_to_receive));
805 port->state = Port::kReceiving;
806 port->peer_node_name = port_descriptor.peer_node_name;
807 port->peer_port_name = port_descriptor.peer_port_name;
808 port->last_sequence_num_to_receive =
809 port_descriptor.last_sequence_num_to_receive;
810 port->peer_closed = port_descriptor.peer_closed;
811
812 DVLOG(2) << "Accepting port " << port_name << " [peer_closed="
813 << port->peer_closed << "; last_sequence_num_to_receive="
814 << port->last_sequence_num_to_receive << "]";
815
816 // A newly accepted port is not signalable until the message referencing the
817 // new port finds its way to the consumer (see GetMessageIf).
818 port->message_queue.set_signalable(false);
819
820 int rv = AddPortWithName(port_name, port);
821 if (rv != OK)
822 return rv;
823
824 // Allow referring port to forward messages.
825 delegate_->ForwardMessage(
826 port_descriptor.referring_node_name,
827 NewInternalMessage(port_descriptor.referring_port_name,
828 EventType::kPortAccepted));
829 return OK;
830 }
831
832 int Node::WillSendMessage_Locked(Port* port,
833 const PortName& port_name,
834 Message* message) {
835 ports_lock_.AssertAcquired();
836 port->lock.AssertAcquired();
837
838 DCHECK(message);
839
840 // Messages may already have a sequence number if they're being forwarded
841 // by a proxy. Otherwise, use the next outgoing sequence number.
842 uint64_t* sequence_num =
843 &GetMutableEventData<UserEventData>(message)->sequence_num;
844 if (*sequence_num == 0)
845 *sequence_num = port->next_sequence_num_to_send++;
846
847 #if !defined(NDEBUG)
848 std::ostringstream ports_buf;
849 for (size_t i = 0; i < message->num_ports(); ++i) {
850 if (i > 0)
851 ports_buf << ",";
852 ports_buf << message->ports()[i];
853 }
854 #endif
855
856 if (message->num_ports() > 0) {
857 // Note: Another thread could be trying to send the same ports, so we need
858 // to ensure that they are ours to send before we mutate their state.
859
860 std::vector<scoped_refptr<Port>> ports;
861 ports.resize(message->num_ports());
862
863 {
864 for (size_t i = 0; i < message->num_ports(); ++i) {
865 ports[i] = GetPort_Locked(message->ports()[i]);
866 ports[i]->lock.Acquire();
867
868 int error = OK;
869 if (ports[i]->state != Port::kReceiving)
870 error = ERROR_PORT_STATE_UNEXPECTED;
871 else if (message->ports()[i] == port->peer_port_name)
872 error = ERROR_PORT_CANNOT_SEND_PEER;
873
874 if (error != OK) {
875 // Oops, we cannot send this port.
876 for (size_t j = 0; j <= i; ++j)
877 ports[i]->lock.Release();
878 // Backpedal on the sequence number.
879 port->next_sequence_num_to_send--;
880 return error;
881 }
882 }
883 }
884
885 PortDescriptor* port_descriptors =
886 GetMutablePortDescriptors(GetMutableEventData<UserEventData>(message));
887
888 for (size_t i = 0; i < message->num_ports(); ++i) {
889 WillSendPort_Locked(ports[i].get(),
890 port->peer_node_name,
891 message->mutable_ports() + i,
892 port_descriptors + i);
893 }
894
895 for (size_t i = 0; i < message->num_ports(); ++i)
896 ports[i]->lock.Release();
897 }
898
899 #if !defined(NDEBUG)
900 DVLOG(2) << "Sending message "
901 << GetEventData<UserEventData>(*message)->sequence_num
902 << " [ports=" << ports_buf.str() << "]"
903 << " from " << port_name << "@" << name_
904 << " to " << port->peer_port_name << "@" << port->peer_node_name;
905 #endif
906
907 GetMutableEventHeader(message)->port_name = port->peer_port_name;
908 return OK;
909 }
910
911 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) {
912 ports_lock_.AssertAcquired();
913 port->lock.AssertAcquired();
914
915 for (;;) {
916 ScopedMessage message;
917 port->message_queue.GetNextMessageIf(nullptr, &message);
918 if (!message)
919 break;
920
921 int rv = WillSendMessage_Locked(port, port_name, message.get());
922 if (rv != OK)
923 return rv;
924
925 delegate_->ForwardMessage(port->peer_node_name, std::move(message));
926 }
927 return OK;
928 }
929
930 void Node::InitiateProxyRemoval_Locked(Port* port,
931 const PortName& port_name) {
932 port->lock.AssertAcquired();
933
934 // To remove this node, we start by notifying the connected graph that we are
935 // a proxy. This allows whatever port is referencing this node to skip it.
936 // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
937 // the peer was closed in the meantime).
938
939 ObserveProxyEventData data;
940 data.proxy_node_name = name_;
941 data.proxy_port_name = port_name;
942 data.proxy_to_node_name = port->peer_node_name;
943 data.proxy_to_port_name = port->peer_port_name;
944
945 delegate_->ForwardMessage(
946 port->peer_node_name,
947 NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data));
948 }
949
950 void Node::MaybeRemoveProxy_Locked(Port* port,
951 const PortName& port_name) {
952 // |ports_lock_| must be held so we can potentilaly ErasePort_Locked().
953 ports_lock_.AssertAcquired();
954 port->lock.AssertAcquired();
955
956 DCHECK(port->state == Port::kProxying);
957
958 // Make sure we have seen ObserveProxyAck before removing the port.
959 if (!port->remove_proxy_on_last_message)
960 return;
961
962 if (!CanAcceptMoreMessages(port)) {
963 // This proxy port is done. We can now remove it!
964 ErasePort_Locked(port_name);
965
966 if (port->send_on_proxy_removal) {
967 NodeName to_node = port->send_on_proxy_removal->first;
968 ScopedMessage& message = port->send_on_proxy_removal->second;
969
970 delegate_->ForwardMessage(to_node, std::move(message));
971 }
972 } else {
973 DVLOG(2) << "Cannot remove port " << port_name << "@" << name_
974 << " now; waiting for more messages";
975 }
976 }
977
978 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name,
979 const EventType& type,
980 const void* data,
981 size_t num_data_bytes) {
982 ScopedMessage message;
983 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message);
984
985 EventHeader* header = GetMutableEventHeader(message.get());
986 header->port_name = port_name;
987 header->type = type;
988 header->padding = 0;
989
990 if (num_data_bytes)
991 memcpy(header + 1, data, num_data_bytes);
992
993 return message;
994 }
995
996 } // namespace ports
997 } // namespace edk
998 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698