| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/system/proxy_message_pipe_endpoint.h" | 5 #include "mojo/system/proxy_message_pipe_endpoint.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include "base/logging.h" | 9 #include "base/logging.h" |
| 10 #include "mojo/system/channel.h" | 10 #include "mojo/system/channel.h" |
| 11 #include "mojo/system/local_message_pipe_endpoint.h" | 11 #include "mojo/system/local_message_pipe_endpoint.h" |
| 12 #include "mojo/system/message_pipe_dispatcher.h" | 12 #include "mojo/system/message_pipe_dispatcher.h" |
| 13 | 13 |
| 14 namespace mojo { | 14 namespace mojo { |
| 15 namespace system { | 15 namespace system { |
| 16 | 16 |
| 17 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint() | 17 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint() |
| 18 : local_id_(MessageInTransit::kInvalidEndpointId), | 18 : local_id_(MessageInTransit::kInvalidEndpointId), |
| 19 remote_id_(MessageInTransit::kInvalidEndpointId), | 19 remote_id_(MessageInTransit::kInvalidEndpointId), |
| 20 is_open_(true), | |
| 21 is_peer_open_(true) { | 20 is_peer_open_(true) { |
| 22 } | 21 } |
| 23 | 22 |
| 24 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( | 23 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( |
| 25 LocalMessagePipeEndpoint* local_message_pipe_endpoint, | 24 LocalMessagePipeEndpoint* local_message_pipe_endpoint, |
| 26 bool is_peer_open) | 25 bool is_peer_open) |
| 27 : local_id_(MessageInTransit::kInvalidEndpointId), | 26 : local_id_(MessageInTransit::kInvalidEndpointId), |
| 28 remote_id_(MessageInTransit::kInvalidEndpointId), | 27 remote_id_(MessageInTransit::kInvalidEndpointId), |
| 29 is_open_(true), | |
| 30 is_peer_open_(is_peer_open), | 28 is_peer_open_(is_peer_open), |
| 31 paused_message_queue_(MessageInTransitQueue::PassContents(), | 29 paused_message_queue_(MessageInTransitQueue::PassContents(), |
| 32 local_message_pipe_endpoint->message_queue()) { | 30 local_message_pipe_endpoint->message_queue()) { |
| 33 local_message_pipe_endpoint->Close(); | 31 local_message_pipe_endpoint->Close(); |
| 34 } | 32 } |
| 35 | 33 |
| 36 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() { | 34 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() { |
| 37 DCHECK(!is_running()); | 35 DCHECK(!is_running()); |
| 38 DCHECK(!is_attached()); | 36 DCHECK(!is_attached()); |
| 39 AssertConsistentState(); | 37 AssertConsistentState(); |
| 40 DCHECK(paused_message_queue_.IsEmpty()); | 38 DCHECK(paused_message_queue_.IsEmpty()); |
| 41 } | 39 } |
| 42 | 40 |
| 43 MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const { | 41 MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const { |
| 44 return kTypeProxy; | 42 return kTypeProxy; |
| 45 } | 43 } |
| 46 | 44 |
| 47 void ProxyMessagePipeEndpoint::Close() { | 45 bool ProxyMessagePipeEndpoint::OnPeerClose() { |
| 48 DCHECK(is_open_); | |
| 49 is_open_ = false; | |
| 50 | |
| 51 DCHECK(is_attached()); | |
| 52 channel_->DetachMessagePipeEndpoint(local_id_); | |
| 53 channel_ = NULL; | |
| 54 local_id_ = MessageInTransit::kInvalidEndpointId; | |
| 55 remote_id_ = MessageInTransit::kInvalidEndpointId; | |
| 56 paused_message_queue_.Clear(); | |
| 57 } | |
| 58 | |
| 59 void ProxyMessagePipeEndpoint::OnPeerClose() { | |
| 60 DCHECK(is_open_); | |
| 61 DCHECK(is_peer_open_); | 46 DCHECK(is_peer_open_); |
| 62 | 47 |
| 63 is_peer_open_ = false; | 48 is_peer_open_ = false; |
| 64 EnqueueMessage(make_scoped_ptr( | 49 |
| 65 new MessageInTransit(MessageInTransit::kTypeMessagePipe, | 50 // If our outgoing message queue isn't empty, we shouldn't be destroyed yet. |
| 66 MessageInTransit::kSubtypeMessagePipePeerClosed, | 51 if (!paused_message_queue_.IsEmpty()) |
| 67 0, 0, NULL))); | 52 return true; |
| 53 |
| 54 if (is_attached()) { |
| 55 if (!is_running()) { |
| 56 // If we're not running yet, we can't be destroyed yet, because we're |
| 57 // still waiting for the "run" message from the other side. |
| 58 return true; |
| 59 } |
| 60 |
| 61 Detach(); |
| 62 } |
| 63 |
| 64 return false; |
| 68 } | 65 } |
| 69 | 66 |
| 70 // Note: We may have to enqueue messages even when our (local) peer isn't open | 67 // Note: We may have to enqueue messages even when our (local) peer isn't open |
| 71 // -- it may have been written to and closed immediately, before we were ready. | 68 // -- it may have been written to and closed immediately, before we were ready. |
| 72 // This case is handled in |Run()| (which will call us). | 69 // This case is handled in |Run()| (which will call us). |
| 73 void ProxyMessagePipeEndpoint::EnqueueMessage( | 70 void ProxyMessagePipeEndpoint::EnqueueMessage( |
| 74 scoped_ptr<MessageInTransit> message) { | 71 scoped_ptr<MessageInTransit> message) { |
| 75 DCHECK(is_open_); | |
| 76 | |
| 77 if (is_running()) { | 72 if (is_running()) { |
| 78 message->SerializeAndCloseDispatchers(channel_.get()); | 73 message->SerializeAndCloseDispatchers(channel_.get()); |
| 79 | 74 |
| 80 message->set_source_id(local_id_); | 75 message->set_source_id(local_id_); |
| 81 message->set_destination_id(remote_id_); | 76 message->set_destination_id(remote_id_); |
| 82 if (!channel_->WriteMessage(message.Pass())) | 77 if (!channel_->WriteMessage(message.Pass())) |
| 83 LOG(WARNING) << "Failed to write message to channel"; | 78 LOG(WARNING) << "Failed to write message to channel"; |
| 84 } else { | 79 } else { |
| 85 paused_message_queue_.AddMessage(message.Pass()); | 80 paused_message_queue_.AddMessage(message.Pass()); |
| 86 } | 81 } |
| 87 } | 82 } |
| 88 | 83 |
| 89 void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel, | 84 void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel, |
| 90 MessageInTransit::EndpointId local_id) { | 85 MessageInTransit::EndpointId local_id) { |
| 91 DCHECK(channel.get()); | 86 DCHECK(channel.get()); |
| 92 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); | 87 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); |
| 93 | 88 |
| 94 DCHECK(!is_attached()); | 89 DCHECK(!is_attached()); |
| 95 | 90 |
| 96 AssertConsistentState(); | 91 AssertConsistentState(); |
| 97 channel_ = channel; | 92 channel_ = channel; |
| 98 local_id_ = local_id; | 93 local_id_ = local_id; |
| 99 AssertConsistentState(); | 94 AssertConsistentState(); |
| 100 } | 95 } |
| 101 | 96 |
| 102 void ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) { | 97 bool ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) { |
| 103 // Assertions about arguments: | 98 // Assertions about arguments: |
| 104 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); | 99 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); |
| 105 | 100 |
| 106 // Assertions about current state: | 101 // Assertions about current state: |
| 107 DCHECK(is_attached()); | 102 DCHECK(is_attached()); |
| 108 DCHECK(!is_running()); | 103 DCHECK(!is_running()); |
| 109 | 104 |
| 110 AssertConsistentState(); | 105 AssertConsistentState(); |
| 111 remote_id_ = remote_id; | 106 remote_id_ = remote_id; |
| 112 AssertConsistentState(); | 107 AssertConsistentState(); |
| 113 | 108 |
| 114 while (!paused_message_queue_.IsEmpty()) | 109 while (!paused_message_queue_.IsEmpty()) |
| 115 EnqueueMessage(paused_message_queue_.GetMessage()); | 110 EnqueueMessage(paused_message_queue_.GetMessage()); |
| 111 |
| 112 if (is_peer_open_) |
| 113 return true; // Stay alive. |
| 114 |
| 115 // We were just waiting to die. |
| 116 Detach(); |
| 117 return false; |
| 118 } |
| 119 |
| 120 void ProxyMessagePipeEndpoint::OnRemove() { |
| 121 Detach(); |
| 122 } |
| 123 |
| 124 void ProxyMessagePipeEndpoint::Detach() { |
| 125 DCHECK(is_attached()); |
| 126 |
| 127 AssertConsistentState(); |
| 128 channel_->DetachMessagePipeEndpoint(local_id_, remote_id_); |
| 129 channel_ = NULL; |
| 130 local_id_ = MessageInTransit::kInvalidEndpointId; |
| 131 remote_id_ = MessageInTransit::kInvalidEndpointId; |
| 132 paused_message_queue_.Clear(); |
| 133 AssertConsistentState(); |
| 116 } | 134 } |
| 117 | 135 |
| 118 #ifndef NDEBUG | 136 #ifndef NDEBUG |
| 119 void ProxyMessagePipeEndpoint::AssertConsistentState() const { | 137 void ProxyMessagePipeEndpoint::AssertConsistentState() const { |
| 120 if (is_attached()) { | 138 if (is_attached()) { |
| 121 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); | 139 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); |
| 122 } else { // Not attached. | 140 } else { // Not attached. |
| 123 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId); | 141 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId); |
| 124 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId); | 142 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId); |
| 125 } | 143 } |
| 126 } | 144 } |
| 127 #endif | 145 #endif |
| 128 | 146 |
| 129 } // namespace system | 147 } // namespace system |
| 130 } // namespace mojo | 148 } // namespace mojo |
| OLD | NEW |