Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8)

Side by Side Diff: content/browser/message_port_service.cc

Issue 942583005: Revert of Make sure MessagePortService is used in a thread safe manner. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/browser/message_port_service.h" 5 #include "content/browser/message_port_service.h"
6 6
7 #include "content/common/message_port_messages.h" 7 #include "content/common/message_port_messages.h"
8 #include "content/public/browser/browser_thread.h" 8 #include "content/public/browser/browser_thread.h"
9 #include "content/public/browser/message_port_delegate.h" 9 #include "content/public/browser/message_port_delegate.h"
10 10
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
56 MessagePortService::MessagePortService() 56 MessagePortService::MessagePortService()
57 : next_message_port_id_(0) { 57 : next_message_port_id_(0) {
58 } 58 }
59 59
60 MessagePortService::~MessagePortService() { 60 MessagePortService::~MessagePortService() {
61 } 61 }
62 62
63 void MessagePortService::UpdateMessagePort(int message_port_id, 63 void MessagePortService::UpdateMessagePort(int message_port_id,
64 MessagePortDelegate* delegate, 64 MessagePortDelegate* delegate,
65 int routing_id) { 65 int routing_id) {
66 DCHECK_CURRENTLY_ON(BrowserThread::IO); 66 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
67 if (!message_ports_.count(message_port_id)) { 67 if (!message_ports_.count(message_port_id)) {
68 NOTREACHED(); 68 NOTREACHED();
69 return; 69 return;
70 } 70 }
71 71
72 MessagePort& port = message_ports_[message_port_id]; 72 MessagePort& port = message_ports_[message_port_id];
73 port.delegate = delegate; 73 port.delegate = delegate;
74 port.route_id = routing_id; 74 port.route_id = routing_id;
75 } 75 }
76 76
77 void MessagePortService::OnMessagePortDelegateClosing( 77 void MessagePortService::OnMessagePortDelegateClosing(
78 MessagePortDelegate* delegate) { 78 MessagePortDelegate* delegate) {
79 DCHECK_CURRENTLY_ON(BrowserThread::IO);
80 // Check if the (possibly) crashed process had any message ports. 79 // Check if the (possibly) crashed process had any message ports.
81 for (MessagePorts::iterator iter = message_ports_.begin(); 80 for (MessagePorts::iterator iter = message_ports_.begin();
82 iter != message_ports_.end();) { 81 iter != message_ports_.end();) {
83 MessagePorts::iterator cur_item = iter++; 82 MessagePorts::iterator cur_item = iter++;
84 if (cur_item->second.delegate == delegate) { 83 if (cur_item->second.delegate == delegate) {
85 Erase(cur_item->first); 84 Erase(cur_item->first);
86 } 85 }
87 } 86 }
88 } 87 }
89 88
90 void MessagePortService::Create(int route_id, 89 void MessagePortService::Create(int route_id,
91 MessagePortDelegate* delegate, 90 MessagePortDelegate* delegate,
92 int* message_port_id) { 91 int* message_port_id) {
93 DCHECK_CURRENTLY_ON(BrowserThread::IO); 92 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
94 *message_port_id = ++next_message_port_id_; 93 *message_port_id = ++next_message_port_id_;
95 94
96 MessagePort port; 95 MessagePort port;
97 port.delegate = delegate; 96 port.delegate = delegate;
98 port.route_id = route_id; 97 port.route_id = route_id;
99 port.message_port_id = *message_port_id; 98 port.message_port_id = *message_port_id;
100 port.entangled_message_port_id = MSG_ROUTING_NONE; 99 port.entangled_message_port_id = MSG_ROUTING_NONE;
101 port.queue_for_inflight_messages = false; 100 port.queue_for_inflight_messages = false;
102 port.hold_messages_for_destination = false; 101 port.hold_messages_for_destination = false;
103 port.should_be_destroyed = false; 102 port.should_be_destroyed = false;
104 message_ports_[*message_port_id] = port; 103 message_ports_[*message_port_id] = port;
105 } 104 }
106 105
107 void MessagePortService::Destroy(int message_port_id) { 106 void MessagePortService::Destroy(int message_port_id) {
108 DCHECK_CURRENTLY_ON(BrowserThread::IO);
109 if (!message_ports_.count(message_port_id)) { 107 if (!message_ports_.count(message_port_id)) {
110 NOTREACHED(); 108 NOTREACHED();
111 return; 109 return;
112 } 110 }
113 111
114 DCHECK(message_ports_[message_port_id].queued_messages.empty()); 112 DCHECK(message_ports_[message_port_id].queued_messages.empty());
115 113
116 Erase(message_port_id); 114 Erase(message_port_id);
117 } 115 }
118 116
119 void MessagePortService::Entangle(int local_message_port_id, 117 void MessagePortService::Entangle(int local_message_port_id,
120 int remote_message_port_id) { 118 int remote_message_port_id) {
121 DCHECK_CURRENTLY_ON(BrowserThread::IO);
122 if (!message_ports_.count(local_message_port_id) || 119 if (!message_ports_.count(local_message_port_id) ||
123 !message_ports_.count(remote_message_port_id)) { 120 !message_ports_.count(remote_message_port_id)) {
124 NOTREACHED(); 121 NOTREACHED();
125 return; 122 return;
126 } 123 }
127 124
128 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id == 125 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id ==
129 MSG_ROUTING_NONE); 126 MSG_ROUTING_NONE);
130 message_ports_[remote_message_port_id].entangled_message_port_id = 127 message_ports_[remote_message_port_id].entangled_message_port_id =
131 local_message_port_id; 128 local_message_port_id;
132 } 129 }
133 130
134 void MessagePortService::PostMessage( 131 void MessagePortService::PostMessage(
135 int sender_message_port_id, 132 int sender_message_port_id,
136 const base::string16& message, 133 const base::string16& message,
137 const std::vector<int>& sent_message_port_ids) { 134 const std::vector<int>& sent_message_port_ids) {
138 DCHECK_CURRENTLY_ON(BrowserThread::IO);
139 if (!message_ports_.count(sender_message_port_id)) { 135 if (!message_ports_.count(sender_message_port_id)) {
140 NOTREACHED(); 136 NOTREACHED();
141 return; 137 return;
142 } 138 }
143 139
144 int entangled_message_port_id = 140 int entangled_message_port_id =
145 message_ports_[sender_message_port_id].entangled_message_port_id; 141 message_ports_[sender_message_port_id].entangled_message_port_id;
146 if (entangled_message_port_id == MSG_ROUTING_NONE) 142 if (entangled_message_port_id == MSG_ROUTING_NONE)
147 return; // Process could have crashed. 143 return; // Process could have crashed.
148 144
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
192 NOTREACHED(); 188 NOTREACHED();
193 return; 189 return;
194 } 190 }
195 191
196 // Now send the message to the entangled port. 192 // Now send the message to the entangled port.
197 entangled_port.delegate->SendMessage(entangled_port.route_id, message, 193 entangled_port.delegate->SendMessage(entangled_port.route_id, message,
198 sent_message_port_ids); 194 sent_message_port_ids);
199 } 195 }
200 196
201 void MessagePortService::QueueMessages(int message_port_id) { 197 void MessagePortService::QueueMessages(int message_port_id) {
202 DCHECK_CURRENTLY_ON(BrowserThread::IO);
203 if (!message_ports_.count(message_port_id)) { 198 if (!message_ports_.count(message_port_id)) {
204 NOTREACHED(); 199 NOTREACHED();
205 return; 200 return;
206 } 201 }
207 202
208 MessagePort& port = message_ports_[message_port_id]; 203 MessagePort& port = message_ports_[message_port_id];
209 if (port.delegate) { 204 if (port.delegate) {
210 port.delegate->SendMessagesAreQueued(port.route_id); 205 port.delegate->SendMessagesAreQueued(port.route_id);
211 port.queue_for_inflight_messages = true; 206 port.queue_for_inflight_messages = true;
212 port.delegate = NULL; 207 port.delegate = NULL;
213 } 208 }
214 } 209 }
215 210
216 void MessagePortService::SendQueuedMessages( 211 void MessagePortService::SendQueuedMessages(
217 int message_port_id, 212 int message_port_id,
218 const QueuedMessages& queued_messages) { 213 const QueuedMessages& queued_messages) {
219 DCHECK_CURRENTLY_ON(BrowserThread::IO);
220 if (!message_ports_.count(message_port_id)) { 214 if (!message_ports_.count(message_port_id)) {
221 NOTREACHED(); 215 NOTREACHED();
222 return; 216 return;
223 } 217 }
224 218
225 // Send the queued messages to the port again. This time they'll reach the 219 // Send the queued messages to the port again. This time they'll reach the
226 // new location. 220 // new location.
227 MessagePort& port = message_ports_[message_port_id]; 221 MessagePort& port = message_ports_[message_port_id];
228 port.queue_for_inflight_messages = false; 222 port.queue_for_inflight_messages = false;
229 223
230 // If the port is currently holding messages waiting for the target renderer, 224 // If the port is currently holding messages waiting for the target renderer,
231 // all ports in messages being sent to the port should also be put on hold. 225 // all ports in messages being sent to the port should also be put on hold.
232 if (port.hold_messages_for_destination) { 226 if (port.hold_messages_for_destination) {
233 for (const auto& message : queued_messages) 227 for (const auto& message : queued_messages)
234 for (int sent_message_port_id : message.second) 228 for (int sent_message_port_id : message.second)
235 HoldMessages(sent_message_port_id); 229 HoldMessages(sent_message_port_id);
236 } 230 }
237 231
238 port.queued_messages.insert(port.queued_messages.begin(), 232 port.queued_messages.insert(port.queued_messages.begin(),
239 queued_messages.begin(), 233 queued_messages.begin(),
240 queued_messages.end()); 234 queued_messages.end());
241 235
242 if (port.should_be_destroyed) 236 if (port.should_be_destroyed)
243 ClosePort(message_port_id); 237 ClosePort(message_port_id);
244 else 238 else
245 SendQueuedMessagesIfPossible(message_port_id); 239 SendQueuedMessagesIfPossible(message_port_id);
246 } 240 }
247 241
248 void MessagePortService::SendQueuedMessagesIfPossible(int message_port_id) { 242 void MessagePortService::SendQueuedMessagesIfPossible(int message_port_id) {
249 DCHECK_CURRENTLY_ON(BrowserThread::IO);
250 if (!message_ports_.count(message_port_id)) { 243 if (!message_ports_.count(message_port_id)) {
251 NOTREACHED(); 244 NOTREACHED();
252 return; 245 return;
253 } 246 }
254 247
255 MessagePort& port = message_ports_[message_port_id]; 248 MessagePort& port = message_ports_[message_port_id];
256 if (port.queue_messages() || !port.delegate) 249 if (port.queue_messages() || !port.delegate)
257 return; 250 return;
258 251
259 for (QueuedMessages::iterator iter = port.queued_messages.begin(); 252 for (QueuedMessages::iterator iter = port.queued_messages.begin();
260 iter != port.queued_messages.end(); ++iter) { 253 iter != port.queued_messages.end(); ++iter) {
261 PostMessageTo(message_port_id, iter->first, iter->second); 254 PostMessageTo(message_port_id, iter->first, iter->second);
262 } 255 }
263 port.queued_messages.clear(); 256 port.queued_messages.clear();
264 } 257 }
265 258
266 void MessagePortService::HoldMessages(int message_port_id) { 259 void MessagePortService::HoldMessages(int message_port_id) {
267 DCHECK_CURRENTLY_ON(BrowserThread::IO);
268 if (!message_ports_.count(message_port_id)) { 260 if (!message_ports_.count(message_port_id)) {
269 NOTREACHED(); 261 NOTREACHED();
270 return; 262 return;
271 } 263 }
272 264
273 // Any ports in messages currently in the queue should also be put on hold. 265 // Any ports in messages currently in the queue should also be put on hold.
274 for (const auto& message : message_ports_[message_port_id].queued_messages) 266 for (const auto& message : message_ports_[message_port_id].queued_messages)
275 for (int sent_message_port_id : message.second) 267 for (int sent_message_port_id : message.second)
276 HoldMessages(sent_message_port_id); 268 HoldMessages(sent_message_port_id);
277 269
278 message_ports_[message_port_id].hold_messages_for_destination = true; 270 message_ports_[message_port_id].hold_messages_for_destination = true;
279 } 271 }
280 272
281 void MessagePortService::ClosePort(int message_port_id) { 273 void MessagePortService::ClosePort(int message_port_id) {
282 DCHECK_CURRENTLY_ON(BrowserThread::IO);
283 if (!message_ports_.count(message_port_id)) { 274 if (!message_ports_.count(message_port_id)) {
284 NOTREACHED(); 275 NOTREACHED();
285 return; 276 return;
286 } 277 }
287 278
288 if (message_ports_[message_port_id].queue_for_inflight_messages) { 279 if (message_ports_[message_port_id].queue_for_inflight_messages) {
289 message_ports_[message_port_id].should_be_destroyed = true; 280 message_ports_[message_port_id].should_be_destroyed = true;
290 return; 281 return;
291 } 282 }
292 283
293 // First close any message ports in the queue for this message port. 284 // First close any message ports in the queue for this message port.
294 for (const auto& message : message_ports_[message_port_id].queued_messages) 285 for (const auto& message : message_ports_[message_port_id].queued_messages)
295 for (int sent_message_port_id : message.second) 286 for (int sent_message_port_id : message.second)
296 ClosePort(sent_message_port_id); 287 ClosePort(sent_message_port_id);
297 288
298 Erase(message_port_id); 289 Erase(message_port_id);
299 } 290 }
300 291
301 void MessagePortService::ReleaseMessages(int message_port_id) { 292 void MessagePortService::ReleaseMessages(int message_port_id) {
302 DCHECK_CURRENTLY_ON(BrowserThread::IO);
303 if (!message_ports_.count(message_port_id)) { 293 if (!message_ports_.count(message_port_id)) {
304 NOTREACHED(); 294 NOTREACHED();
305 return; 295 return;
306 } 296 }
307 297
308 message_ports_[message_port_id].hold_messages_for_destination = false; 298 message_ports_[message_port_id].hold_messages_for_destination = false;
309 SendQueuedMessagesIfPossible(message_port_id); 299 SendQueuedMessagesIfPossible(message_port_id);
310 } 300 }
311 301
312 void MessagePortService::Erase(int message_port_id) { 302 void MessagePortService::Erase(int message_port_id) {
313 MessagePorts::iterator erase_item = message_ports_.find(message_port_id); 303 MessagePorts::iterator erase_item = message_ports_.find(message_port_id);
314 DCHECK(erase_item != message_ports_.end()); 304 DCHECK(erase_item != message_ports_.end());
315 305
316 int entangled_id = erase_item->second.entangled_message_port_id; 306 int entangled_id = erase_item->second.entangled_message_port_id;
317 if (entangled_id != MSG_ROUTING_NONE) { 307 if (entangled_id != MSG_ROUTING_NONE) {
318 // Do the disentanglement (and be paranoid about the other side existing 308 // Do the disentanglement (and be paranoid about the other side existing
319 // just in case something unusual happened during entanglement). 309 // just in case something unusual happened during entanglement).
320 if (message_ports_.count(entangled_id)) { 310 if (message_ports_.count(entangled_id)) {
321 message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE; 311 message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE;
322 } 312 }
323 } 313 }
324 message_ports_.erase(erase_item); 314 message_ports_.erase(erase_item);
325 } 315 }
326 316
327 } // namespace content 317 } // namespace content
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698