OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/system/channel_endpoint.h" | |
6 | |
7 #include "base/logging.h" | |
8 #include "mojo/system/channel.h" | |
9 #include "mojo/system/message_pipe.h" | |
10 #include "mojo/system/transport_data.h" | |
11 | |
12 namespace mojo { | |
13 namespace system { | |
14 | |
15 ChannelEndpoint::ChannelEndpoint(MessagePipe* message_pipe, unsigned port) | |
16 : state_(STATE_NORMAL), | |
17 message_pipe_(message_pipe), | |
18 port_(port), | |
19 channel_(), | |
20 local_id_(MessageInTransit::kInvalidEndpointId), | |
21 remote_id_(MessageInTransit::kInvalidEndpointId) { | |
22 DCHECK(message_pipe_.get()); | |
23 DCHECK(port_ == 0 || port_ == 1); | |
24 } | |
25 | |
26 void ChannelEndpoint::TakeMessages(MessageInTransitQueue* message_queue) { | |
27 DCHECK(paused_message_queue_.IsEmpty()); | |
28 paused_message_queue_.Swap(message_queue); | |
29 } | |
30 | |
31 bool ChannelEndpoint::EnqueueMessage(scoped_ptr<MessageInTransit> message) { | |
32 DCHECK(message); | |
33 | |
34 base::AutoLock locker(lock_); | |
35 | |
36 if (!channel_ || remote_id_ == MessageInTransit::kInvalidEndpointId) { | |
37 // We may reach here if we haven't been attached or run yet. | |
38 // TODO(vtl): We may also reach here if the channel is shut down early for | |
39 // some reason (with live message pipes on it). We can't check |state_| yet, | |
40 // until it's protected under lock, but in this case we should return false | |
41 // (and not enqueue any messages). | |
42 paused_message_queue_.AddMessage(message.Pass()); | |
43 return true; | |
44 } | |
45 | |
46 // TODO(vtl): Currently, this only works in the "running" case. | |
47 DCHECK_NE(remote_id_, MessageInTransit::kInvalidEndpointId); | |
48 | |
49 return WriteMessageNoLock(message.Pass()); | |
50 } | |
51 | |
52 void ChannelEndpoint::DetachFromMessagePipe() { | |
53 // TODO(vtl): Once |message_pipe_| is under |lock_|, we should null it out | |
54 // here. For now, get the channel to do so for us. | |
55 | |
56 scoped_refptr<Channel> channel; | |
57 { | |
58 base::AutoLock locker(lock_); | |
59 DCHECK(message_pipe_.get()); | |
60 message_pipe_ = nullptr; | |
61 | |
62 if (!channel_) | |
63 return; | |
64 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); | |
65 // TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid | |
66 // here as well. | |
67 channel = channel_; | |
68 } | |
69 // Don't call this under |lock_|, since it'll call us back. | |
70 // TODO(vtl): This seems pretty suboptimal. | |
71 channel->DetachMessagePipeEndpoint(local_id_, remote_id_); | |
72 } | |
73 | |
74 void ChannelEndpoint::AttachToChannel(Channel* channel, | |
75 MessageInTransit::EndpointId local_id) { | |
76 DCHECK(channel); | |
77 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); | |
78 | |
79 base::AutoLock locker(lock_); | |
80 DCHECK(!channel_); | |
81 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId); | |
82 channel_ = channel; | |
83 local_id_ = local_id; | |
84 } | |
85 | |
86 void ChannelEndpoint::Run(MessageInTransit::EndpointId remote_id) { | |
87 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); | |
88 | |
89 base::AutoLock locker(lock_); | |
90 DCHECK(channel_); | |
91 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId); | |
92 remote_id_ = remote_id; | |
93 | |
94 while (!paused_message_queue_.IsEmpty()) { | |
95 LOG_IF(WARNING, !WriteMessageNoLock(paused_message_queue_.GetMessage())) | |
96 << "Failed to write enqueue message to channel"; | |
97 } | |
98 } | |
99 | |
100 bool ChannelEndpoint::OnReadMessage( | |
101 const MessageInTransit::View& message_view, | |
102 embedder::ScopedPlatformHandleVectorPtr platform_handles) { | |
103 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | |
104 scoped_refptr<MessagePipe> message_pipe; | |
105 unsigned port; | |
106 { | |
107 base::AutoLock locker(lock_); | |
108 DCHECK(channel_); | |
109 if (!message_pipe_.get()) { | |
110 // This isn't a failure per se. (It just means that, e.g., the other end | |
111 // of the message point closed first.) | |
112 return true; | |
113 } | |
114 | |
115 if (message_view.transport_data_buffer_size() > 0) { | |
116 DCHECK(message_view.transport_data_buffer()); | |
117 message->SetDispatchers(TransportData::DeserializeDispatchers( | |
118 message_view.transport_data_buffer(), | |
119 message_view.transport_data_buffer_size(), | |
120 platform_handles.Pass(), | |
121 channel_)); | |
122 } | |
123 | |
124 // Take a ref, and call |EnqueueMessage()| outside the lock. | |
125 message_pipe = message_pipe_; | |
126 port = port_; | |
127 } | |
128 | |
129 MojoResult result = message_pipe->EnqueueMessage( | |
130 MessagePipe::GetPeerPort(port), message.Pass()); | |
131 return (result == MOJO_RESULT_OK); | |
132 } | |
133 | |
134 void ChannelEndpoint::OnDisconnect() { | |
135 scoped_refptr<MessagePipe> message_pipe; | |
136 unsigned port; | |
137 { | |
138 base::AutoLock locker(lock_); | |
139 if (!message_pipe_.get()) | |
140 return; | |
141 | |
142 // Take a ref, and call |Close()| outside the lock. | |
143 message_pipe = message_pipe_; | |
144 port = port_; | |
145 } | |
146 message_pipe->Close(port); | |
147 } | |
148 | |
149 void ChannelEndpoint::DetachFromChannel() { | |
150 base::AutoLock locker(lock_); | |
151 DCHECK(channel_); | |
152 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); | |
153 // TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid | |
154 // here as well. | |
155 channel_ = nullptr; | |
156 local_id_ = MessageInTransit::kInvalidEndpointId; | |
157 remote_id_ = MessageInTransit::kInvalidEndpointId; | |
158 } | |
159 | |
160 ChannelEndpoint::~ChannelEndpoint() { | |
161 DCHECK(!message_pipe_.get()); | |
162 DCHECK(!channel_); | |
163 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId); | |
164 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId); | |
165 } | |
166 | |
167 bool ChannelEndpoint::WriteMessageNoLock(scoped_ptr<MessageInTransit> message) { | |
168 DCHECK(message); | |
169 | |
170 lock_.AssertAcquired(); | |
171 | |
172 DCHECK(channel_); | |
173 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); | |
174 DCHECK_NE(remote_id_, MessageInTransit::kInvalidEndpointId); | |
175 | |
176 message->SerializeAndCloseDispatchers(channel_); | |
177 message->set_source_id(local_id_); | |
178 message->set_destination_id(remote_id_); | |
179 return channel_->WriteMessage(message.Pass()); | |
180 } | |
181 | |
182 } // namespace system | |
183 } // namespace mojo | |
OLD | NEW |