Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(197)

Side by Side Diff: mojo/system/channel_endpoint.cc

Issue 621153003: Move mojo edk into mojo/edk (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix checkdeps Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/system/channel_endpoint.h ('k') | mojo/system/channel_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/channel_endpoint.h"
6
7 #include "base/logging.h"
8 #include "mojo/system/channel.h"
9 #include "mojo/system/message_pipe.h"
10 #include "mojo/system/transport_data.h"
11
12 namespace mojo {
13 namespace system {
14
15 ChannelEndpoint::ChannelEndpoint(MessagePipe* message_pipe, unsigned port)
16 : state_(STATE_NORMAL),
17 message_pipe_(message_pipe),
18 port_(port),
19 channel_(),
20 local_id_(MessageInTransit::kInvalidEndpointId),
21 remote_id_(MessageInTransit::kInvalidEndpointId) {
22 DCHECK(message_pipe_.get());
23 DCHECK(port_ == 0 || port_ == 1);
24 }
25
26 void ChannelEndpoint::TakeMessages(MessageInTransitQueue* message_queue) {
27 DCHECK(paused_message_queue_.IsEmpty());
28 paused_message_queue_.Swap(message_queue);
29 }
30
31 bool ChannelEndpoint::EnqueueMessage(scoped_ptr<MessageInTransit> message) {
32 DCHECK(message);
33
34 base::AutoLock locker(lock_);
35
36 if (!channel_ || remote_id_ == MessageInTransit::kInvalidEndpointId) {
37 // We may reach here if we haven't been attached or run yet.
38 // TODO(vtl): We may also reach here if the channel is shut down early for
39 // some reason (with live message pipes on it). We can't check |state_| yet,
40 // until it's protected under lock, but in this case we should return false
41 // (and not enqueue any messages).
42 paused_message_queue_.AddMessage(message.Pass());
43 return true;
44 }
45
46 // TODO(vtl): Currently, this only works in the "running" case.
47 DCHECK_NE(remote_id_, MessageInTransit::kInvalidEndpointId);
48
49 return WriteMessageNoLock(message.Pass());
50 }
51
52 void ChannelEndpoint::DetachFromMessagePipe() {
53 // TODO(vtl): Once |message_pipe_| is under |lock_|, we should null it out
54 // here. For now, get the channel to do so for us.
55
56 scoped_refptr<Channel> channel;
57 {
58 base::AutoLock locker(lock_);
59 DCHECK(message_pipe_.get());
60 message_pipe_ = nullptr;
61
62 if (!channel_)
63 return;
64 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
65 // TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid
66 // here as well.
67 channel = channel_;
68 }
69 // Don't call this under |lock_|, since it'll call us back.
70 // TODO(vtl): This seems pretty suboptimal.
71 channel->DetachMessagePipeEndpoint(local_id_, remote_id_);
72 }
73
74 void ChannelEndpoint::AttachToChannel(Channel* channel,
75 MessageInTransit::EndpointId local_id) {
76 DCHECK(channel);
77 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
78
79 base::AutoLock locker(lock_);
80 DCHECK(!channel_);
81 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
82 channel_ = channel;
83 local_id_ = local_id;
84 }
85
86 void ChannelEndpoint::Run(MessageInTransit::EndpointId remote_id) {
87 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
88
89 base::AutoLock locker(lock_);
90 DCHECK(channel_);
91 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
92 remote_id_ = remote_id;
93
94 while (!paused_message_queue_.IsEmpty()) {
95 LOG_IF(WARNING, !WriteMessageNoLock(paused_message_queue_.GetMessage()))
96 << "Failed to write enqueue message to channel";
97 }
98 }
99
100 bool ChannelEndpoint::OnReadMessage(
101 const MessageInTransit::View& message_view,
102 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
103 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
104 scoped_refptr<MessagePipe> message_pipe;
105 unsigned port;
106 {
107 base::AutoLock locker(lock_);
108 DCHECK(channel_);
109 if (!message_pipe_.get()) {
110 // This isn't a failure per se. (It just means that, e.g., the other end
111 // of the message point closed first.)
112 return true;
113 }
114
115 if (message_view.transport_data_buffer_size() > 0) {
116 DCHECK(message_view.transport_data_buffer());
117 message->SetDispatchers(TransportData::DeserializeDispatchers(
118 message_view.transport_data_buffer(),
119 message_view.transport_data_buffer_size(),
120 platform_handles.Pass(),
121 channel_));
122 }
123
124 // Take a ref, and call |EnqueueMessage()| outside the lock.
125 message_pipe = message_pipe_;
126 port = port_;
127 }
128
129 MojoResult result = message_pipe->EnqueueMessage(
130 MessagePipe::GetPeerPort(port), message.Pass());
131 return (result == MOJO_RESULT_OK);
132 }
133
134 void ChannelEndpoint::OnDisconnect() {
135 scoped_refptr<MessagePipe> message_pipe;
136 unsigned port;
137 {
138 base::AutoLock locker(lock_);
139 if (!message_pipe_.get())
140 return;
141
142 // Take a ref, and call |Close()| outside the lock.
143 message_pipe = message_pipe_;
144 port = port_;
145 }
146 message_pipe->Close(port);
147 }
148
149 void ChannelEndpoint::DetachFromChannel() {
150 base::AutoLock locker(lock_);
151 DCHECK(channel_);
152 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
153 // TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid
154 // here as well.
155 channel_ = nullptr;
156 local_id_ = MessageInTransit::kInvalidEndpointId;
157 remote_id_ = MessageInTransit::kInvalidEndpointId;
158 }
159
160 ChannelEndpoint::~ChannelEndpoint() {
161 DCHECK(!message_pipe_.get());
162 DCHECK(!channel_);
163 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
164 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
165 }
166
167 bool ChannelEndpoint::WriteMessageNoLock(scoped_ptr<MessageInTransit> message) {
168 DCHECK(message);
169
170 lock_.AssertAcquired();
171
172 DCHECK(channel_);
173 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
174 DCHECK_NE(remote_id_, MessageInTransit::kInvalidEndpointId);
175
176 message->SerializeAndCloseDispatchers(channel_);
177 message->set_source_id(local_id_);
178 message->set_destination_id(remote_id_);
179 return channel_->WriteMessage(message.Pass());
180 }
181
182 } // namespace system
183 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/system/channel_endpoint.h ('k') | mojo/system/channel_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698