Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(13)

Side by Side Diff: mojo/edk/system/channel_endpoint.cc

Issue 738453003: Add a ChannelEndpointClient abstraction. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: review comments Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/channel_endpoint.h ('k') | mojo/edk/system/channel_endpoint_client.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/channel_endpoint.h" 5 #include "mojo/edk/system/channel_endpoint.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "mojo/edk/system/channel.h" 8 #include "mojo/edk/system/channel.h"
9 #include "mojo/edk/system/message_pipe.h" 9 #include "mojo/edk/system/channel_endpoint_client.h"
10 #include "mojo/edk/system/transport_data.h" 10 #include "mojo/edk/system/transport_data.h"
11 11
12 namespace mojo { 12 namespace mojo {
13 namespace system { 13 namespace system {
14 14
15 ChannelEndpoint::ChannelEndpoint(MessagePipe* message_pipe, 15 ChannelEndpoint::ChannelEndpoint(ChannelEndpointClient* client,
16 unsigned port, 16 unsigned client_port,
17 MessageInTransitQueue* message_queue) 17 MessageInTransitQueue* message_queue)
18 : message_pipe_(message_pipe), port_(port), channel_(nullptr) { 18 : client_(client), client_port_(client_port), channel_(nullptr) {
19 DCHECK(message_pipe_.get() || message_queue); 19 DCHECK(client_.get() || message_queue);
20 DCHECK(port_ == 0 || port_ == 1);
21 20
22 if (message_queue) 21 if (message_queue)
23 paused_message_queue_.Swap(message_queue); 22 paused_message_queue_.Swap(message_queue);
24 } 23 }
25 24
26 bool ChannelEndpoint::EnqueueMessage(scoped_ptr<MessageInTransit> message) { 25 bool ChannelEndpoint::EnqueueMessage(scoped_ptr<MessageInTransit> message) {
27 DCHECK(message); 26 DCHECK(message);
28 27
29 base::AutoLock locker(lock_); 28 base::AutoLock locker(lock_);
30 29
31 if (!channel_ || !remote_id_.is_valid()) { 30 if (!channel_ || !remote_id_.is_valid()) {
32 // We may reach here if we haven't been attached or run yet. 31 // We may reach here if we haven't been attached or run yet.
33 // TODO(vtl): We may also reach here if the channel is shut down early for 32 // TODO(vtl): We may also reach here if the channel is shut down early for
34 // some reason (with live message pipes on it). We can't check |state_| yet, 33 // some reason (with live message pipes on it). We can't check |state_| yet,
35 // until it's protected under lock, but in this case we should return false 34 // until it's protected under lock, but in this case we should return false
36 // (and not enqueue any messages). 35 // (and not enqueue any messages).
37 paused_message_queue_.AddMessage(message.Pass()); 36 paused_message_queue_.AddMessage(message.Pass());
38 return true; 37 return true;
39 } 38 }
40 39
41 // TODO(vtl): Currently, this only works in the "running" case. 40 // TODO(vtl): Currently, this only works in the "running" case.
42 DCHECK(remote_id_.is_valid()); 41 DCHECK(remote_id_.is_valid());
43 42
44 return WriteMessageNoLock(message.Pass()); 43 return WriteMessageNoLock(message.Pass());
45 } 44 }
46 45
47 void ChannelEndpoint::DetachFromMessagePipe() { 46 void ChannelEndpoint::DetachFromClient() {
48 { 47 {
49 base::AutoLock locker(lock_); 48 base::AutoLock locker(lock_);
50 DCHECK(message_pipe_.get()); 49 DCHECK(client_.get());
51 message_pipe_ = nullptr; 50 client_ = nullptr;
52 51
53 if (!channel_) 52 if (!channel_)
54 return; 53 return;
55 DCHECK(local_id_.is_valid()); 54 DCHECK(local_id_.is_valid());
56 DCHECK(remote_id_.is_valid()); 55 DCHECK(remote_id_.is_valid());
57 channel_->DetachEndpoint(this, local_id_, remote_id_); 56 channel_->DetachEndpoint(this, local_id_, remote_id_);
58 channel_ = nullptr; 57 channel_ = nullptr;
59 local_id_ = ChannelEndpointId(); 58 local_id_ = ChannelEndpointId();
60 remote_id_ = ChannelEndpointId(); 59 remote_id_ = ChannelEndpointId();
61 } 60 }
(...skipping 12 matching lines...) Expand all
74 DCHECK(!remote_id_.is_valid()); 73 DCHECK(!remote_id_.is_valid());
75 channel_ = channel; 74 channel_ = channel;
76 local_id_ = local_id; 75 local_id_ = local_id;
77 remote_id_ = remote_id; 76 remote_id_ = remote_id;
78 77
79 while (!paused_message_queue_.IsEmpty()) { 78 while (!paused_message_queue_.IsEmpty()) {
80 LOG_IF(WARNING, !WriteMessageNoLock(paused_message_queue_.GetMessage())) 79 LOG_IF(WARNING, !WriteMessageNoLock(paused_message_queue_.GetMessage()))
81 << "Failed to write enqueue message to channel"; 80 << "Failed to write enqueue message to channel";
82 } 81 }
83 82
84 if (!message_pipe_.get()) { 83 if (!client_.get()) {
85 channel_->DetachEndpoint(this, local_id_, remote_id_); 84 channel_->DetachEndpoint(this, local_id_, remote_id_);
86 channel_ = nullptr; 85 channel_ = nullptr;
87 local_id_ = ChannelEndpointId(); 86 local_id_ = ChannelEndpointId();
88 remote_id_ = ChannelEndpointId(); 87 remote_id_ = ChannelEndpointId();
89 } 88 }
90 } 89 }
91 90
92 bool ChannelEndpoint::OnReadMessage( 91 bool ChannelEndpoint::OnReadMessage(
93 const MessageInTransit::View& message_view, 92 const MessageInTransit::View& message_view,
94 embedder::ScopedPlatformHandleVectorPtr platform_handles) { 93 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
95 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); 94 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
96 scoped_refptr<MessagePipe> message_pipe; 95 scoped_refptr<ChannelEndpointClient> client;
97 unsigned port; 96 unsigned client_port;
98 { 97 {
99 base::AutoLock locker(lock_); 98 base::AutoLock locker(lock_);
100 DCHECK(channel_); 99 DCHECK(channel_);
101 if (!message_pipe_.get()) { 100 if (!client_.get()) {
102 // This isn't a failure per se. (It just means that, e.g., the other end 101 // This isn't a failure per se. (It just means that, e.g., the other end
103 // of the message point closed first.) 102 // of the message point closed first.)
104 return true; 103 return true;
105 } 104 }
106 105
107 if (message_view.transport_data_buffer_size() > 0) { 106 if (message_view.transport_data_buffer_size() > 0) {
108 DCHECK(message_view.transport_data_buffer()); 107 DCHECK(message_view.transport_data_buffer());
109 message->SetDispatchers(TransportData::DeserializeDispatchers( 108 message->SetDispatchers(TransportData::DeserializeDispatchers(
110 message_view.transport_data_buffer(), 109 message_view.transport_data_buffer(),
111 message_view.transport_data_buffer_size(), platform_handles.Pass(), 110 message_view.transport_data_buffer_size(), platform_handles.Pass(),
112 channel_)); 111 channel_));
113 } 112 }
114 113
115 // Take a ref, and call |EnqueueMessage()| outside the lock. 114 // Take a ref, and call |EnqueueMessage()| outside the lock.
116 message_pipe = message_pipe_; 115 client = client_;
117 port = port_; 116 client_port = client_port_;
118 } 117 }
119 118
120 MojoResult result = message_pipe->EnqueueMessage( 119 return client->OnReadMessage(client_port, message.Pass());
121 MessagePipe::GetPeerPort(port), message.Pass());
122 return (result == MOJO_RESULT_OK);
123 } 120 }
124 121
125 void ChannelEndpoint::DetachFromChannel() { 122 void ChannelEndpoint::DetachFromChannel() {
126 scoped_refptr<MessagePipe> message_pipe; 123 scoped_refptr<ChannelEndpointClient> client;
127 unsigned port; 124 unsigned client_port;
128 { 125 {
129 base::AutoLock locker(lock_); 126 base::AutoLock locker(lock_);
130 127
131 if (message_pipe_.get()) { 128 if (client_.get()) {
132 // Take a ref, and call |Close()| outside the lock. 129 // Take a ref, and call |OnDetachFromChannel()| outside the lock.
133 message_pipe = message_pipe_; 130 client = client_;
134 port = port_; 131 client_port = client_port_;
135 } 132 }
136 133
137 // |channel_| may already be null if we already detached from the channel in 134 // |channel_| may already be null if we already detached from the channel in
138 // |DetachFromMessagePipe()| by calling |Channel::DetachEndpoint()| (and 135 // |DetachFromClient()| by calling |Channel::DetachEndpoint()| (and there
139 // there are racing detaches). 136 // are racing detaches).
140 if (channel_) { 137 if (channel_) {
141 DCHECK(local_id_.is_valid()); 138 DCHECK(local_id_.is_valid());
142 DCHECK(remote_id_.is_valid()); 139 DCHECK(remote_id_.is_valid());
143 channel_ = nullptr; 140 channel_ = nullptr;
144 local_id_ = ChannelEndpointId(); 141 local_id_ = ChannelEndpointId();
145 remote_id_ = ChannelEndpointId(); 142 remote_id_ = ChannelEndpointId();
146 } 143 }
147 } 144 }
148 145
149 if (message_pipe.get()) 146 if (client.get())
150 message_pipe->Close(port); 147 client->OnDetachFromChannel(client_port);
151 } 148 }
152 149
153 ChannelEndpoint::~ChannelEndpoint() { 150 ChannelEndpoint::~ChannelEndpoint() {
154 DCHECK(!message_pipe_.get()); 151 DCHECK(!client_.get());
155 DCHECK(!channel_); 152 DCHECK(!channel_);
156 DCHECK(!local_id_.is_valid()); 153 DCHECK(!local_id_.is_valid());
157 DCHECK(!remote_id_.is_valid()); 154 DCHECK(!remote_id_.is_valid());
158 } 155 }
159 156
160 bool ChannelEndpoint::WriteMessageNoLock(scoped_ptr<MessageInTransit> message) { 157 bool ChannelEndpoint::WriteMessageNoLock(scoped_ptr<MessageInTransit> message) {
161 DCHECK(message); 158 DCHECK(message);
162 159
163 lock_.AssertAcquired(); 160 lock_.AssertAcquired();
164 161
165 DCHECK(channel_); 162 DCHECK(channel_);
166 DCHECK(local_id_.is_valid()); 163 DCHECK(local_id_.is_valid());
167 DCHECK(remote_id_.is_valid()); 164 DCHECK(remote_id_.is_valid());
168 165
169 message->SerializeAndCloseDispatchers(channel_); 166 message->SerializeAndCloseDispatchers(channel_);
170 message->set_source_id(local_id_); 167 message->set_source_id(local_id_);
171 message->set_destination_id(remote_id_); 168 message->set_destination_id(remote_id_);
172 return channel_->WriteMessage(message.Pass()); 169 return channel_->WriteMessage(message.Pass());
173 } 170 }
174 171
175 } // namespace system 172 } // namespace system
176 } // namespace mojo 173 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/channel_endpoint.h ('k') | mojo/edk/system/channel_endpoint_client.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698