| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/system/proxy_message_pipe_endpoint.h" | 5 #include "mojo/system/proxy_message_pipe_endpoint.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include "base/logging.h" | 9 #include "base/logging.h" |
| 10 #include "mojo/system/channel.h" | |
| 11 #include "mojo/system/channel_endpoint.h" | 10 #include "mojo/system/channel_endpoint.h" |
| 12 #include "mojo/system/local_message_pipe_endpoint.h" | 11 #include "mojo/system/local_message_pipe_endpoint.h" |
| 13 #include "mojo/system/message_pipe_dispatcher.h" | 12 #include "mojo/system/message_pipe_dispatcher.h" |
| 14 | 13 |
| 15 namespace mojo { | 14 namespace mojo { |
| 16 namespace system { | 15 namespace system { |
| 17 | 16 |
| 18 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint() | 17 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint() |
| 19 : local_id_(MessageInTransit::kInvalidEndpointId), | 18 : is_running_(false), is_peer_open_(true) { |
| 20 remote_id_(MessageInTransit::kInvalidEndpointId), | |
| 21 is_peer_open_(true) { | |
| 22 } | 19 } |
| 23 | 20 |
| 24 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( | 21 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( |
| 25 LocalMessagePipeEndpoint* local_message_pipe_endpoint, | 22 LocalMessagePipeEndpoint* local_message_pipe_endpoint, |
| 26 bool is_peer_open) | 23 bool is_peer_open) |
| 27 : local_id_(MessageInTransit::kInvalidEndpointId), | 24 : is_running_(false), |
| 28 remote_id_(MessageInTransit::kInvalidEndpointId), | |
| 29 is_peer_open_(is_peer_open), | 25 is_peer_open_(is_peer_open), |
| 30 paused_message_queue_(MessageInTransitQueue::PassContents(), | 26 paused_message_queue_(MessageInTransitQueue::PassContents(), |
| 31 local_message_pipe_endpoint->message_queue()) { | 27 local_message_pipe_endpoint->message_queue()) { |
| 32 local_message_pipe_endpoint->Close(); | 28 local_message_pipe_endpoint->Close(); |
| 33 } | 29 } |
| 34 | 30 |
| 35 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() { | 31 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() { |
| 36 DCHECK(!is_running()); | 32 DCHECK(!is_running()); |
| 37 DCHECK(!is_attached()); | 33 DCHECK(!is_attached()); |
| 38 AssertConsistentState(); | |
| 39 DCHECK(paused_message_queue_.IsEmpty()); | 34 DCHECK(paused_message_queue_.IsEmpty()); |
| 40 } | 35 } |
| 41 | 36 |
| 42 MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const { | 37 MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const { |
| 43 return kTypeProxy; | 38 return kTypeProxy; |
| 44 } | 39 } |
| 45 | 40 |
| 46 bool ProxyMessagePipeEndpoint::OnPeerClose() { | 41 bool ProxyMessagePipeEndpoint::OnPeerClose() { |
| 47 DCHECK(is_peer_open_); | 42 DCHECK(is_peer_open_); |
| 48 | 43 |
| (...skipping 15 matching lines...) Expand all Loading... |
| 64 | 59 |
| 65 return false; | 60 return false; |
| 66 } | 61 } |
| 67 | 62 |
| 68 // Note: We may have to enqueue messages even when our (local) peer isn't open | 63 // Note: We may have to enqueue messages even when our (local) peer isn't open |
| 69 // -- it may have been written to and closed immediately, before we were ready. | 64 // -- it may have been written to and closed immediately, before we were ready. |
| 70 // This case is handled in |Run()| (which will call us). | 65 // This case is handled in |Run()| (which will call us). |
| 71 void ProxyMessagePipeEndpoint::EnqueueMessage( | 66 void ProxyMessagePipeEndpoint::EnqueueMessage( |
| 72 scoped_ptr<MessageInTransit> message) { | 67 scoped_ptr<MessageInTransit> message) { |
| 73 if (is_running()) { | 68 if (is_running()) { |
| 74 message->SerializeAndCloseDispatchers(channel_.get()); | 69 DCHECK(channel_endpoint_.get()); |
| 75 | 70 LOG_IF(WARNING, !channel_endpoint_->EnqueueMessage(message.Pass())) |
| 76 message->set_source_id(local_id_); | 71 << "Failed to write enqueue message to channel"; |
| 77 message->set_destination_id(remote_id_); | |
| 78 if (!channel_->WriteMessage(message.Pass())) | |
| 79 LOG(WARNING) << "Failed to write message to channel"; | |
| 80 } else { | 72 } else { |
| 81 paused_message_queue_.AddMessage(message.Pass()); | 73 paused_message_queue_.AddMessage(message.Pass()); |
| 82 } | 74 } |
| 83 } | 75 } |
| 84 | 76 |
| 85 void ProxyMessagePipeEndpoint::Attach(ChannelEndpoint* channel_endpoint, | 77 void ProxyMessagePipeEndpoint::Attach(ChannelEndpoint* channel_endpoint) { |
| 86 Channel* channel, | |
| 87 MessageInTransit::EndpointId local_id) { | |
| 88 DCHECK(channel_endpoint); | 78 DCHECK(channel_endpoint); |
| 89 DCHECK(channel); | |
| 90 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); | |
| 91 | |
| 92 DCHECK(!is_attached()); | 79 DCHECK(!is_attached()); |
| 93 | |
| 94 AssertConsistentState(); | |
| 95 channel_endpoint_ = channel_endpoint; | 80 channel_endpoint_ = channel_endpoint; |
| 96 channel_ = channel; | |
| 97 local_id_ = local_id; | |
| 98 AssertConsistentState(); | |
| 99 } | 81 } |
| 100 | 82 |
| 101 bool ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) { | 83 bool ProxyMessagePipeEndpoint::Run() { |
| 102 // Assertions about arguments: | |
| 103 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); | |
| 104 | |
| 105 // Assertions about current state: | 84 // Assertions about current state: |
| 106 DCHECK(is_attached()); | 85 DCHECK(is_attached()); |
| 107 DCHECK(!is_running()); | 86 DCHECK(!is_running()); |
| 108 | 87 |
| 109 AssertConsistentState(); | 88 is_running_ = true; |
| 110 remote_id_ = remote_id; | |
| 111 AssertConsistentState(); | |
| 112 | 89 |
| 113 while (!paused_message_queue_.IsEmpty()) | 90 while (!paused_message_queue_.IsEmpty()) { |
| 114 EnqueueMessage(paused_message_queue_.GetMessage()); | 91 LOG_IF( |
| 92 WARNING, |
| 93 !channel_endpoint_->EnqueueMessage(paused_message_queue_.GetMessage())) |
| 94 << "Failed to write enqueue message to channel"; |
| 95 } |
| 115 | 96 |
| 116 if (is_peer_open_) | 97 if (is_peer_open_) |
| 117 return true; // Stay alive. | 98 return true; // Stay alive. |
| 118 | 99 |
| 119 // We were just waiting to die. | 100 // We were just waiting to die. |
| 120 Detach(); | 101 Detach(); |
| 121 return false; | 102 return false; |
| 122 } | 103 } |
| 123 | 104 |
| 124 void ProxyMessagePipeEndpoint::OnRemove() { | 105 void ProxyMessagePipeEndpoint::OnRemove() { |
| 125 Detach(); | 106 Detach(); |
| 126 } | 107 } |
| 127 | 108 |
| 128 void ProxyMessagePipeEndpoint::Detach() { | 109 void ProxyMessagePipeEndpoint::Detach() { |
| 129 DCHECK(is_attached()); | 110 DCHECK(is_attached()); |
| 130 | 111 |
| 131 AssertConsistentState(); | 112 channel_endpoint_->DetachFromMessagePipe(); |
| 132 channel_->DetachMessagePipeEndpoint(local_id_, remote_id_); | |
| 133 channel_ = NULL; | |
| 134 // TODO(vtl): Inform |channel_endpoint_| that we were detached. | |
| 135 channel_endpoint_ = NULL; | 113 channel_endpoint_ = NULL; |
| 136 local_id_ = MessageInTransit::kInvalidEndpointId; | 114 is_running_ = false; |
| 137 remote_id_ = MessageInTransit::kInvalidEndpointId; | |
| 138 paused_message_queue_.Clear(); | 115 paused_message_queue_.Clear(); |
| 139 AssertConsistentState(); | |
| 140 } | 116 } |
| 141 | 117 |
| 142 #ifndef NDEBUG | |
| 143 void ProxyMessagePipeEndpoint::AssertConsistentState() const { | |
| 144 if (is_attached()) { | |
| 145 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); | |
| 146 } else { // Not attached. | |
| 147 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId); | |
| 148 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId); | |
| 149 } | |
| 150 } | |
| 151 #endif | |
| 152 | |
| 153 } // namespace system | 118 } // namespace system |
| 154 } // namespace mojo | 119 } // namespace mojo |
| OLD | NEW |