Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(130)

Side by Side Diff: mojo/edk/system/channel_endpoint.cc

Issue 782693004: Update mojo sdk to rev f6c8ec07c01deebc13178d516225fd12695c3dc2 (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: hack mojo_system_impl gypi for android :| Created 6 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/channel_endpoint.h" 5 #include "mojo/edk/system/channel_endpoint.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "base/threading/platform_thread.h"
8 #include "mojo/edk/system/channel.h" 9 #include "mojo/edk/system/channel.h"
9 #include "mojo/edk/system/channel_endpoint_client.h" 10 #include "mojo/edk/system/channel_endpoint_client.h"
10 11
11 namespace mojo { 12 namespace mojo {
12 namespace system { 13 namespace system {
13 14
14 ChannelEndpoint::ChannelEndpoint(ChannelEndpointClient* client, 15 ChannelEndpoint::ChannelEndpoint(ChannelEndpointClient* client,
15 unsigned client_port, 16 unsigned client_port,
16 MessageInTransitQueue* message_queue) 17 MessageInTransitQueue* message_queue)
17 : client_(client), client_port_(client_port), channel_(nullptr) { 18 : client_(client),
18 DCHECK(client_.get() || message_queue); 19 client_port_(client_port),
20 channel_(nullptr),
21 is_detached_from_channel_(false) {
22 DCHECK(client_ || message_queue);
19 23
20 if (message_queue) 24 if (message_queue)
21 channel_message_queue_.Swap(message_queue); 25 channel_message_queue_.Swap(message_queue);
22 } 26 }
23 27
24 bool ChannelEndpoint::EnqueueMessage(scoped_ptr<MessageInTransit> message) { 28 bool ChannelEndpoint::EnqueueMessage(scoped_ptr<MessageInTransit> message) {
25 DCHECK(message); 29 DCHECK(message);
26 30
27 base::AutoLock locker(lock_); 31 base::AutoLock locker(lock_);
28 32
29 if (!channel_) { 33 if (!channel_) {
30 // We may reach here if we haven't been attached/run yet. 34 // We may reach here if we haven't been attached/run yet.
31 // TODO(vtl): We may also reach here if the channel is shut down early for 35 // TODO(vtl): We may also reach here if the channel is shut down early for
32 // some reason (with live message pipes on it). Ideally, we'd return false 36 // some reason (with live message pipes on it). Ideally, we'd return false
33 // (and not enqueue the message), but we currently don't have a way to check 37 // (and not enqueue the message), but we currently don't have a way to check
34 // this. 38 // this.
35 channel_message_queue_.AddMessage(message.Pass()); 39 channel_message_queue_.AddMessage(message.Pass());
36 return true; 40 return true;
37 } 41 }
38 42
39 return WriteMessageNoLock(message.Pass()); 43 return WriteMessageNoLock(message.Pass());
40 } 44 }
41 45
46 bool ChannelEndpoint::ReplaceClient(ChannelEndpointClient* client,
47 unsigned client_port) {
48 DCHECK(client);
49
50 base::AutoLock locker(lock_);
51 DCHECK(client_);
52 DCHECK(client != client_.get() || client_port != client_port_);
53 client_ = client;
54 client_port_ = client_port;
55 return !is_detached_from_channel_;
56 }
57
42 void ChannelEndpoint::DetachFromClient() { 58 void ChannelEndpoint::DetachFromClient() {
43 { 59 base::AutoLock locker(lock_);
44 base::AutoLock locker(lock_); 60 DCHECK(client_);
45 DCHECK(client_.get()); 61 client_ = nullptr;
46 client_ = nullptr;
47 62
48 if (!channel_) 63 if (!channel_)
49 return; 64 return;
50 DCHECK(local_id_.is_valid()); 65 channel_->DetachEndpoint(this, local_id_, remote_id_);
51 DCHECK(remote_id_.is_valid()); 66 ResetChannelNoLock();
52 channel_->DetachEndpoint(this, local_id_, remote_id_);
53 channel_ = nullptr;
54 local_id_ = ChannelEndpointId();
55 remote_id_ = ChannelEndpointId();
56 }
57 } 67 }
58 68
59 void ChannelEndpoint::AttachAndRun(Channel* channel, 69 void ChannelEndpoint::AttachAndRun(Channel* channel,
60 ChannelEndpointId local_id, 70 ChannelEndpointId local_id,
61 ChannelEndpointId remote_id) { 71 ChannelEndpointId remote_id) {
62 DCHECK(channel); 72 DCHECK(channel);
63 DCHECK(local_id.is_valid()); 73 DCHECK(local_id.is_valid());
64 DCHECK(remote_id.is_valid()); 74 DCHECK(remote_id.is_valid());
65 75
66 base::AutoLock locker(lock_); 76 base::AutoLock locker(lock_);
67 DCHECK(!channel_); 77 DCHECK(!channel_);
68 DCHECK(!local_id_.is_valid()); 78 DCHECK(!local_id_.is_valid());
69 DCHECK(!remote_id_.is_valid()); 79 DCHECK(!remote_id_.is_valid());
70 channel_ = channel; 80 channel_ = channel;
71 local_id_ = local_id; 81 local_id_ = local_id;
72 remote_id_ = remote_id; 82 remote_id_ = remote_id;
73 83
74 while (!channel_message_queue_.IsEmpty()) { 84 while (!channel_message_queue_.IsEmpty()) {
75 LOG_IF(WARNING, !WriteMessageNoLock(channel_message_queue_.GetMessage())) 85 LOG_IF(WARNING, !WriteMessageNoLock(channel_message_queue_.GetMessage()))
76 << "Failed to write enqueue message to channel"; 86 << "Failed to write enqueue message to channel";
77 } 87 }
78 88
79 if (!client_.get()) { 89 if (!client_) {
80 channel_->DetachEndpoint(this, local_id_, remote_id_); 90 channel_->DetachEndpoint(this, local_id_, remote_id_);
81 channel_ = nullptr; 91 ResetChannelNoLock();
82 local_id_ = ChannelEndpointId();
83 remote_id_ = ChannelEndpointId();
84 } 92 }
85 } 93 }
86 94
87 void ChannelEndpoint::OnReadMessage(scoped_ptr<MessageInTransit> message) { 95 void ChannelEndpoint::OnReadMessage(scoped_ptr<MessageInTransit> message) {
88 scoped_refptr<ChannelEndpointClient> client; 96 scoped_refptr<ChannelEndpointClient> client;
89 unsigned client_port; 97 unsigned client_port = 0;
90 { 98
91 base::AutoLock locker(lock_); 99 // This loop is to make |ReplaceClient()| work. We can't call the client's
92 DCHECK(channel_); 100 // |OnReadMessage()| under our lock, so by the time we do that, |client| may
93 if (!client_.get()) { 101 // no longer be our client.
94 // This isn't a failure per se. (It just means that, e.g., the other end 102 //
95 // of the message point closed first.) 103 // In that case, |client| must return false. We'll then yield, and retry with
96 return; 104 // the new client. (Theoretically, the client could be replaced again.)
105 //
106 // This solution isn't terribly elegant, but it's the least costly way of
107 // handling/avoiding this (very unlikely) race. (Other solutions -- e.g.,
108 // adding a client message queue, which the client only fetches messages from
109 // -- impose significant cost in the common case.)
110 for (;;) {
111 {
112 base::AutoLock locker(lock_);
113 if (!channel_ || !client_) {
114 // This isn't a failure per se. (It just means that, e.g., the other end
115 // of the message point closed first.)
116 return;
117 }
118
119 // If we get here in a second (third, etc.) iteration of the loop, it's
120 // because |ReplaceClient()| was called.
121 DCHECK(client_ != client || client_port_ != client_port);
122
123 // Take a ref, and call |OnReadMessage()| outside the lock.
124 client = client_;
125 client_port = client_port_;
97 } 126 }
98 127
99 // Take a ref, and call |OnReadMessage()| outside the lock. 128 if (client->OnReadMessage(client_port, message.get())) {
100 client = client_; 129 ignore_result(message.release());
101 client_port = client_port_; 130 break;
131 }
132
133 base::PlatformThread::YieldCurrentThread();
102 } 134 }
103
104 client->OnReadMessage(client_port, message.Pass());
105 } 135 }
106 136
107 void ChannelEndpoint::DetachFromChannel() { 137 void ChannelEndpoint::DetachFromChannel() {
108 scoped_refptr<ChannelEndpointClient> client; 138 scoped_refptr<ChannelEndpointClient> client;
109 unsigned client_port = 0; 139 unsigned client_port = 0;
110 { 140 {
111 base::AutoLock locker(lock_); 141 base::AutoLock locker(lock_);
112 142
113 if (client_.get()) { 143 if (client_) {
114 // Take a ref, and call |OnDetachFromChannel()| outside the lock. 144 // Take a ref, and call |OnDetachFromChannel()| outside the lock.
115 client = client_; 145 client = client_;
116 client_port = client_port_; 146 client_port = client_port_;
117 } 147 }
118 148
119 // |channel_| may already be null if we already detached from the channel in 149 // |channel_| may already be null if we already detached from the channel in
120 // |DetachFromClient()| by calling |Channel::DetachEndpoint()| (and there 150 // |DetachFromClient()| by calling |Channel::DetachEndpoint()| (and there
121 // are racing detaches). 151 // are racing detaches).
122 if (channel_) { 152 if (channel_)
123 DCHECK(local_id_.is_valid()); 153 ResetChannelNoLock();
124 DCHECK(remote_id_.is_valid()); 154 else
125 channel_ = nullptr; 155 DCHECK(is_detached_from_channel_);
126 local_id_ = ChannelEndpointId();
127 remote_id_ = ChannelEndpointId();
128 }
129 } 156 }
130 157
131 if (client.get()) 158 // If |ReplaceClient()| is called (from another thread) after the above locked
159 // section but before we call |OnDetachFromChannel()|, |ReplaceClient()|
160 // return false to notify the caller that the channel was already detached.
161 // (The old client has to accept the arguably-spurious call to
162 // |OnDetachFromChannel()|.)
163 if (client)
132 client->OnDetachFromChannel(client_port); 164 client->OnDetachFromChannel(client_port);
133 } 165 }
134 166
135 ChannelEndpoint::~ChannelEndpoint() { 167 ChannelEndpoint::~ChannelEndpoint() {
136 DCHECK(!client_.get()); 168 DCHECK(!client_);
137 DCHECK(!channel_); 169 DCHECK(!channel_);
138 DCHECK(!local_id_.is_valid()); 170 DCHECK(!local_id_.is_valid());
139 DCHECK(!remote_id_.is_valid()); 171 DCHECK(!remote_id_.is_valid());
140 } 172 }
141 173
142 bool ChannelEndpoint::WriteMessageNoLock(scoped_ptr<MessageInTransit> message) { 174 bool ChannelEndpoint::WriteMessageNoLock(scoped_ptr<MessageInTransit> message) {
143 DCHECK(message); 175 DCHECK(message);
144 176
145 lock_.AssertAcquired(); 177 lock_.AssertAcquired();
146 178
147 DCHECK(channel_); 179 DCHECK(channel_);
148 DCHECK(local_id_.is_valid()); 180 DCHECK(local_id_.is_valid());
149 DCHECK(remote_id_.is_valid()); 181 DCHECK(remote_id_.is_valid());
150 182
151 message->SerializeAndCloseDispatchers(channel_); 183 message->SerializeAndCloseDispatchers(channel_);
152 message->set_source_id(local_id_); 184 message->set_source_id(local_id_);
153 message->set_destination_id(remote_id_); 185 message->set_destination_id(remote_id_);
154 return channel_->WriteMessage(message.Pass()); 186 return channel_->WriteMessage(message.Pass());
155 } 187 }
156 188
189 void ChannelEndpoint::ResetChannelNoLock() {
190 DCHECK(channel_);
191 DCHECK(local_id_.is_valid());
192 DCHECK(remote_id_.is_valid());
193 DCHECK(!is_detached_from_channel_);
194
195 channel_ = nullptr;
196 local_id_ = ChannelEndpointId();
197 remote_id_ = ChannelEndpointId();
198 is_detached_from_channel_ = true;
199 }
200
157 } // namespace system 201 } // namespace system
158 } // namespace mojo 202 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698