| OLD | NEW |
| (Empty) |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "mojo/system/channel_endpoint.h" | |
| 6 | |
| 7 #include "base/logging.h" | |
| 8 #include "mojo/system/channel.h" | |
| 9 #include "mojo/system/message_pipe.h" | |
| 10 #include "mojo/system/transport_data.h" | |
| 11 | |
| 12 namespace mojo { | |
| 13 namespace system { | |
| 14 | |
| 15 ChannelEndpoint::ChannelEndpoint(MessagePipe* message_pipe, unsigned port) | |
| 16 : state_(STATE_NORMAL), | |
| 17 message_pipe_(message_pipe), | |
| 18 port_(port), | |
| 19 channel_(), | |
| 20 local_id_(MessageInTransit::kInvalidEndpointId), | |
| 21 remote_id_(MessageInTransit::kInvalidEndpointId) { | |
| 22 DCHECK(message_pipe_.get()); | |
| 23 DCHECK(port_ == 0 || port_ == 1); | |
| 24 } | |
| 25 | |
| 26 void ChannelEndpoint::TakeMessages(MessageInTransitQueue* message_queue) { | |
| 27 DCHECK(paused_message_queue_.IsEmpty()); | |
| 28 paused_message_queue_.Swap(message_queue); | |
| 29 } | |
| 30 | |
| 31 bool ChannelEndpoint::EnqueueMessage(scoped_ptr<MessageInTransit> message) { | |
| 32 DCHECK(message); | |
| 33 | |
| 34 base::AutoLock locker(lock_); | |
| 35 | |
| 36 if (!channel_ || remote_id_ == MessageInTransit::kInvalidEndpointId) { | |
| 37 // We may reach here if we haven't been attached or run yet. | |
| 38 // TODO(vtl): We may also reach here if the channel is shut down early for | |
| 39 // some reason (with live message pipes on it). We can't check |state_| yet, | |
| 40 // until it's protected under lock, but in this case we should return false | |
| 41 // (and not enqueue any messages). | |
| 42 paused_message_queue_.AddMessage(message.Pass()); | |
| 43 return true; | |
| 44 } | |
| 45 | |
| 46 // TODO(vtl): Currently, this only works in the "running" case. | |
| 47 DCHECK_NE(remote_id_, MessageInTransit::kInvalidEndpointId); | |
| 48 | |
| 49 return WriteMessageNoLock(message.Pass()); | |
| 50 } | |
| 51 | |
| 52 void ChannelEndpoint::DetachFromMessagePipe() { | |
| 53 // TODO(vtl): Once |message_pipe_| is under |lock_|, we should null it out | |
| 54 // here. For now, get the channel to do so for us. | |
| 55 | |
| 56 scoped_refptr<Channel> channel; | |
| 57 { | |
| 58 base::AutoLock locker(lock_); | |
| 59 DCHECK(message_pipe_.get()); | |
| 60 message_pipe_ = nullptr; | |
| 61 | |
| 62 if (!channel_) | |
| 63 return; | |
| 64 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); | |
| 65 // TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid | |
| 66 // here as well. | |
| 67 channel = channel_; | |
| 68 } | |
| 69 // Don't call this under |lock_|, since it'll call us back. | |
| 70 // TODO(vtl): This seems pretty suboptimal. | |
| 71 channel->DetachMessagePipeEndpoint(local_id_, remote_id_); | |
| 72 } | |
| 73 | |
| 74 void ChannelEndpoint::AttachToChannel(Channel* channel, | |
| 75 MessageInTransit::EndpointId local_id) { | |
| 76 DCHECK(channel); | |
| 77 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); | |
| 78 | |
| 79 base::AutoLock locker(lock_); | |
| 80 DCHECK(!channel_); | |
| 81 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId); | |
| 82 channel_ = channel; | |
| 83 local_id_ = local_id; | |
| 84 } | |
| 85 | |
| 86 void ChannelEndpoint::Run(MessageInTransit::EndpointId remote_id) { | |
| 87 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); | |
| 88 | |
| 89 base::AutoLock locker(lock_); | |
| 90 DCHECK(channel_); | |
| 91 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId); | |
| 92 remote_id_ = remote_id; | |
| 93 | |
| 94 while (!paused_message_queue_.IsEmpty()) { | |
| 95 LOG_IF(WARNING, !WriteMessageNoLock(paused_message_queue_.GetMessage())) | |
| 96 << "Failed to write enqueue message to channel"; | |
| 97 } | |
| 98 } | |
| 99 | |
| 100 bool ChannelEndpoint::OnReadMessage( | |
| 101 const MessageInTransit::View& message_view, | |
| 102 embedder::ScopedPlatformHandleVectorPtr platform_handles) { | |
| 103 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | |
| 104 scoped_refptr<MessagePipe> message_pipe; | |
| 105 unsigned port; | |
| 106 { | |
| 107 base::AutoLock locker(lock_); | |
| 108 DCHECK(channel_); | |
| 109 if (!message_pipe_.get()) { | |
| 110 // This isn't a failure per se. (It just means that, e.g., the other end | |
| 111 // of the message point closed first.) | |
| 112 return true; | |
| 113 } | |
| 114 | |
| 115 if (message_view.transport_data_buffer_size() > 0) { | |
| 116 DCHECK(message_view.transport_data_buffer()); | |
| 117 message->SetDispatchers(TransportData::DeserializeDispatchers( | |
| 118 message_view.transport_data_buffer(), | |
| 119 message_view.transport_data_buffer_size(), | |
| 120 platform_handles.Pass(), | |
| 121 channel_)); | |
| 122 } | |
| 123 | |
| 124 // Take a ref, and call |EnqueueMessage()| outside the lock. | |
| 125 message_pipe = message_pipe_; | |
| 126 port = port_; | |
| 127 } | |
| 128 | |
| 129 MojoResult result = message_pipe->EnqueueMessage( | |
| 130 MessagePipe::GetPeerPort(port), message.Pass()); | |
| 131 return (result == MOJO_RESULT_OK); | |
| 132 } | |
| 133 | |
| 134 void ChannelEndpoint::OnDisconnect() { | |
| 135 scoped_refptr<MessagePipe> message_pipe; | |
| 136 unsigned port; | |
| 137 { | |
| 138 base::AutoLock locker(lock_); | |
| 139 if (!message_pipe_.get()) | |
| 140 return; | |
| 141 | |
| 142 // Take a ref, and call |Close()| outside the lock. | |
| 143 message_pipe = message_pipe_; | |
| 144 port = port_; | |
| 145 } | |
| 146 message_pipe->Close(port); | |
| 147 } | |
| 148 | |
| 149 void ChannelEndpoint::DetachFromChannel() { | |
| 150 base::AutoLock locker(lock_); | |
| 151 DCHECK(channel_); | |
| 152 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); | |
| 153 // TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid | |
| 154 // here as well. | |
| 155 channel_ = nullptr; | |
| 156 local_id_ = MessageInTransit::kInvalidEndpointId; | |
| 157 remote_id_ = MessageInTransit::kInvalidEndpointId; | |
| 158 } | |
| 159 | |
| 160 ChannelEndpoint::~ChannelEndpoint() { | |
| 161 DCHECK(!message_pipe_.get()); | |
| 162 DCHECK(!channel_); | |
| 163 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId); | |
| 164 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId); | |
| 165 } | |
| 166 | |
| 167 bool ChannelEndpoint::WriteMessageNoLock(scoped_ptr<MessageInTransit> message) { | |
| 168 DCHECK(message); | |
| 169 | |
| 170 lock_.AssertAcquired(); | |
| 171 | |
| 172 DCHECK(channel_); | |
| 173 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); | |
| 174 DCHECK_NE(remote_id_, MessageInTransit::kInvalidEndpointId); | |
| 175 | |
| 176 message->SerializeAndCloseDispatchers(channel_); | |
| 177 message->set_source_id(local_id_); | |
| 178 message->set_destination_id(remote_id_); | |
| 179 return channel_->WriteMessage(message.Pass()); | |
| 180 } | |
| 181 | |
| 182 } // namespace system | |
| 183 } // namespace mojo | |
| OLD | NEW |