Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(345)

Side by Side Diff: content/browser/message_port_service.cc

Issue 937503003: Make sure MessagePortService is used in a thread safe manner. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: address nit Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/browser/message_port_service.h" 5 #include "content/browser/message_port_service.h"
6 6
7 #include "content/common/message_port_messages.h" 7 #include "content/common/message_port_messages.h"
8 #include "content/public/browser/browser_thread.h" 8 #include "content/public/browser/browser_thread.h"
9 #include "content/public/browser/message_port_delegate.h" 9 #include "content/public/browser/message_port_delegate.h"
10 10
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
56 MessagePortService::MessagePortService() 56 MessagePortService::MessagePortService()
57 : next_message_port_id_(0) { 57 : next_message_port_id_(0) {
58 } 58 }
59 59
60 MessagePortService::~MessagePortService() { 60 MessagePortService::~MessagePortService() {
61 } 61 }
62 62
63 void MessagePortService::UpdateMessagePort(int message_port_id, 63 void MessagePortService::UpdateMessagePort(int message_port_id,
64 MessagePortDelegate* delegate, 64 MessagePortDelegate* delegate,
65 int routing_id) { 65 int routing_id) {
66 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 66 DCHECK_CURRENTLY_ON(BrowserThread::IO);
67 if (!message_ports_.count(message_port_id)) { 67 if (!message_ports_.count(message_port_id)) {
68 NOTREACHED(); 68 NOTREACHED();
69 return; 69 return;
70 } 70 }
71 71
72 MessagePort& port = message_ports_[message_port_id]; 72 MessagePort& port = message_ports_[message_port_id];
73 port.delegate = delegate; 73 port.delegate = delegate;
74 port.route_id = routing_id; 74 port.route_id = routing_id;
75 } 75 }
76 76
77 void MessagePortService::OnMessagePortDelegateClosing( 77 void MessagePortService::OnMessagePortDelegateClosing(
78 MessagePortDelegate* delegate) { 78 MessagePortDelegate* delegate) {
79 DCHECK_CURRENTLY_ON(BrowserThread::IO);
79 // Check if the (possibly) crashed process had any message ports. 80 // Check if the (possibly) crashed process had any message ports.
80 for (MessagePorts::iterator iter = message_ports_.begin(); 81 for (MessagePorts::iterator iter = message_ports_.begin();
81 iter != message_ports_.end();) { 82 iter != message_ports_.end();) {
82 MessagePorts::iterator cur_item = iter++; 83 MessagePorts::iterator cur_item = iter++;
83 if (cur_item->second.delegate == delegate) { 84 if (cur_item->second.delegate == delegate) {
84 Erase(cur_item->first); 85 Erase(cur_item->first);
85 } 86 }
86 } 87 }
87 } 88 }
88 89
89 void MessagePortService::Create(int route_id, 90 void MessagePortService::Create(int route_id,
90 MessagePortDelegate* delegate, 91 MessagePortDelegate* delegate,
91 int* message_port_id) { 92 int* message_port_id) {
92 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 93 DCHECK_CURRENTLY_ON(BrowserThread::IO);
93 *message_port_id = ++next_message_port_id_; 94 *message_port_id = ++next_message_port_id_;
94 95
95 MessagePort port; 96 MessagePort port;
96 port.delegate = delegate; 97 port.delegate = delegate;
97 port.route_id = route_id; 98 port.route_id = route_id;
98 port.message_port_id = *message_port_id; 99 port.message_port_id = *message_port_id;
99 port.entangled_message_port_id = MSG_ROUTING_NONE; 100 port.entangled_message_port_id = MSG_ROUTING_NONE;
100 port.queue_for_inflight_messages = false; 101 port.queue_for_inflight_messages = false;
101 port.hold_messages_for_destination = false; 102 port.hold_messages_for_destination = false;
102 port.should_be_destroyed = false; 103 port.should_be_destroyed = false;
103 message_ports_[*message_port_id] = port; 104 message_ports_[*message_port_id] = port;
104 } 105 }
105 106
106 void MessagePortService::Destroy(int message_port_id) { 107 void MessagePortService::Destroy(int message_port_id) {
108 DCHECK_CURRENTLY_ON(BrowserThread::IO);
107 if (!message_ports_.count(message_port_id)) { 109 if (!message_ports_.count(message_port_id)) {
108 NOTREACHED(); 110 NOTREACHED();
109 return; 111 return;
110 } 112 }
111 113
112 DCHECK(message_ports_[message_port_id].queued_messages.empty()); 114 DCHECK(message_ports_[message_port_id].queued_messages.empty());
113 115
114 Erase(message_port_id); 116 Erase(message_port_id);
115 } 117 }
116 118
117 void MessagePortService::Entangle(int local_message_port_id, 119 void MessagePortService::Entangle(int local_message_port_id,
118 int remote_message_port_id) { 120 int remote_message_port_id) {
121 DCHECK_CURRENTLY_ON(BrowserThread::IO);
119 if (!message_ports_.count(local_message_port_id) || 122 if (!message_ports_.count(local_message_port_id) ||
120 !message_ports_.count(remote_message_port_id)) { 123 !message_ports_.count(remote_message_port_id)) {
121 NOTREACHED(); 124 NOTREACHED();
122 return; 125 return;
123 } 126 }
124 127
125 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id == 128 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id ==
126 MSG_ROUTING_NONE); 129 MSG_ROUTING_NONE);
127 message_ports_[remote_message_port_id].entangled_message_port_id = 130 message_ports_[remote_message_port_id].entangled_message_port_id =
128 local_message_port_id; 131 local_message_port_id;
129 } 132 }
130 133
131 void MessagePortService::PostMessage( 134 void MessagePortService::PostMessage(
132 int sender_message_port_id, 135 int sender_message_port_id,
133 const base::string16& message, 136 const base::string16& message,
134 const std::vector<int>& sent_message_port_ids) { 137 const std::vector<int>& sent_message_port_ids) {
138 DCHECK_CURRENTLY_ON(BrowserThread::IO);
135 if (!message_ports_.count(sender_message_port_id)) { 139 if (!message_ports_.count(sender_message_port_id)) {
136 NOTREACHED(); 140 NOTREACHED();
137 return; 141 return;
138 } 142 }
139 143
140 int entangled_message_port_id = 144 int entangled_message_port_id =
141 message_ports_[sender_message_port_id].entangled_message_port_id; 145 message_ports_[sender_message_port_id].entangled_message_port_id;
142 if (entangled_message_port_id == MSG_ROUTING_NONE) 146 if (entangled_message_port_id == MSG_ROUTING_NONE)
143 return; // Process could have crashed. 147 return; // Process could have crashed.
144 148
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
188 NOTREACHED(); 192 NOTREACHED();
189 return; 193 return;
190 } 194 }
191 195
192 // Now send the message to the entangled port. 196 // Now send the message to the entangled port.
193 entangled_port.delegate->SendMessage(entangled_port.route_id, message, 197 entangled_port.delegate->SendMessage(entangled_port.route_id, message,
194 sent_message_port_ids); 198 sent_message_port_ids);
195 } 199 }
196 200
197 void MessagePortService::QueueMessages(int message_port_id) { 201 void MessagePortService::QueueMessages(int message_port_id) {
202 DCHECK_CURRENTLY_ON(BrowserThread::IO);
198 if (!message_ports_.count(message_port_id)) { 203 if (!message_ports_.count(message_port_id)) {
199 NOTREACHED(); 204 NOTREACHED();
200 return; 205 return;
201 } 206 }
202 207
203 MessagePort& port = message_ports_[message_port_id]; 208 MessagePort& port = message_ports_[message_port_id];
204 if (port.delegate) { 209 if (port.delegate) {
205 port.delegate->SendMessagesAreQueued(port.route_id); 210 port.delegate->SendMessagesAreQueued(port.route_id);
206 port.queue_for_inflight_messages = true; 211 port.queue_for_inflight_messages = true;
207 port.delegate = NULL; 212 port.delegate = NULL;
208 } 213 }
209 } 214 }
210 215
211 void MessagePortService::SendQueuedMessages( 216 void MessagePortService::SendQueuedMessages(
212 int message_port_id, 217 int message_port_id,
213 const QueuedMessages& queued_messages) { 218 const QueuedMessages& queued_messages) {
219 DCHECK_CURRENTLY_ON(BrowserThread::IO);
214 if (!message_ports_.count(message_port_id)) { 220 if (!message_ports_.count(message_port_id)) {
215 NOTREACHED(); 221 NOTREACHED();
216 return; 222 return;
217 } 223 }
218 224
219 // Send the queued messages to the port again. This time they'll reach the 225 // Send the queued messages to the port again. This time they'll reach the
220 // new location. 226 // new location.
221 MessagePort& port = message_ports_[message_port_id]; 227 MessagePort& port = message_ports_[message_port_id];
222 port.queue_for_inflight_messages = false; 228 port.queue_for_inflight_messages = false;
223 229
224 // If the port is currently holding messages waiting for the target renderer, 230 // If the port is currently holding messages waiting for the target renderer,
225 // all ports in messages being sent to the port should also be put on hold. 231 // all ports in messages being sent to the port should also be put on hold.
226 if (port.hold_messages_for_destination) { 232 if (port.hold_messages_for_destination) {
227 for (const auto& message : queued_messages) 233 for (const auto& message : queued_messages)
228 for (int sent_message_port_id : message.second) 234 for (int sent_message_port_id : message.second)
229 HoldMessages(sent_message_port_id); 235 HoldMessages(sent_message_port_id);
230 } 236 }
231 237
232 port.queued_messages.insert(port.queued_messages.begin(), 238 port.queued_messages.insert(port.queued_messages.begin(),
233 queued_messages.begin(), 239 queued_messages.begin(),
234 queued_messages.end()); 240 queued_messages.end());
235 241
236 if (port.should_be_destroyed) 242 if (port.should_be_destroyed)
237 ClosePort(message_port_id); 243 ClosePort(message_port_id);
238 else 244 else
239 SendQueuedMessagesIfPossible(message_port_id); 245 SendQueuedMessagesIfPossible(message_port_id);
240 } 246 }
241 247
242 void MessagePortService::SendQueuedMessagesIfPossible(int message_port_id) { 248 void MessagePortService::SendQueuedMessagesIfPossible(int message_port_id) {
249 DCHECK_CURRENTLY_ON(BrowserThread::IO);
243 if (!message_ports_.count(message_port_id)) { 250 if (!message_ports_.count(message_port_id)) {
244 NOTREACHED(); 251 NOTREACHED();
245 return; 252 return;
246 } 253 }
247 254
248 MessagePort& port = message_ports_[message_port_id]; 255 MessagePort& port = message_ports_[message_port_id];
249 if (port.queue_messages() || !port.delegate) 256 if (port.queue_messages() || !port.delegate)
250 return; 257 return;
251 258
252 for (QueuedMessages::iterator iter = port.queued_messages.begin(); 259 for (QueuedMessages::iterator iter = port.queued_messages.begin();
253 iter != port.queued_messages.end(); ++iter) { 260 iter != port.queued_messages.end(); ++iter) {
254 PostMessageTo(message_port_id, iter->first, iter->second); 261 PostMessageTo(message_port_id, iter->first, iter->second);
255 } 262 }
256 port.queued_messages.clear(); 263 port.queued_messages.clear();
257 } 264 }
258 265
259 void MessagePortService::HoldMessages(int message_port_id) { 266 void MessagePortService::HoldMessages(int message_port_id) {
267 DCHECK_CURRENTLY_ON(BrowserThread::IO);
260 if (!message_ports_.count(message_port_id)) { 268 if (!message_ports_.count(message_port_id)) {
261 NOTREACHED(); 269 NOTREACHED();
262 return; 270 return;
263 } 271 }
264 272
265 // Any ports in messages currently in the queue should also be put on hold. 273 // Any ports in messages currently in the queue should also be put on hold.
266 for (const auto& message : message_ports_[message_port_id].queued_messages) 274 for (const auto& message : message_ports_[message_port_id].queued_messages)
267 for (int sent_message_port_id : message.second) 275 for (int sent_message_port_id : message.second)
268 HoldMessages(sent_message_port_id); 276 HoldMessages(sent_message_port_id);
269 277
270 message_ports_[message_port_id].hold_messages_for_destination = true; 278 message_ports_[message_port_id].hold_messages_for_destination = true;
271 } 279 }
272 280
273 void MessagePortService::ClosePort(int message_port_id) { 281 void MessagePortService::ClosePort(int message_port_id) {
282 DCHECK_CURRENTLY_ON(BrowserThread::IO);
274 if (!message_ports_.count(message_port_id)) { 283 if (!message_ports_.count(message_port_id)) {
275 NOTREACHED(); 284 NOTREACHED();
276 return; 285 return;
277 } 286 }
278 287
279 if (message_ports_[message_port_id].queue_for_inflight_messages) { 288 if (message_ports_[message_port_id].queue_for_inflight_messages) {
280 message_ports_[message_port_id].should_be_destroyed = true; 289 message_ports_[message_port_id].should_be_destroyed = true;
281 return; 290 return;
282 } 291 }
283 292
284 // First close any message ports in the queue for this message port. 293 // First close any message ports in the queue for this message port.
285 for (const auto& message : message_ports_[message_port_id].queued_messages) 294 for (const auto& message : message_ports_[message_port_id].queued_messages)
286 for (int sent_message_port_id : message.second) 295 for (int sent_message_port_id : message.second)
287 ClosePort(sent_message_port_id); 296 ClosePort(sent_message_port_id);
288 297
289 Erase(message_port_id); 298 Erase(message_port_id);
290 } 299 }
291 300
292 void MessagePortService::ReleaseMessages(int message_port_id) { 301 void MessagePortService::ReleaseMessages(int message_port_id) {
302 DCHECK_CURRENTLY_ON(BrowserThread::IO);
293 if (!message_ports_.count(message_port_id)) { 303 if (!message_ports_.count(message_port_id)) {
294 NOTREACHED(); 304 NOTREACHED();
295 return; 305 return;
296 } 306 }
297 307
298 message_ports_[message_port_id].hold_messages_for_destination = false; 308 message_ports_[message_port_id].hold_messages_for_destination = false;
299 SendQueuedMessagesIfPossible(message_port_id); 309 SendQueuedMessagesIfPossible(message_port_id);
300 } 310 }
301 311
302 void MessagePortService::Erase(int message_port_id) { 312 void MessagePortService::Erase(int message_port_id) {
303 MessagePorts::iterator erase_item = message_ports_.find(message_port_id); 313 MessagePorts::iterator erase_item = message_ports_.find(message_port_id);
304 DCHECK(erase_item != message_ports_.end()); 314 DCHECK(erase_item != message_ports_.end());
305 315
306 int entangled_id = erase_item->second.entangled_message_port_id; 316 int entangled_id = erase_item->second.entangled_message_port_id;
307 if (entangled_id != MSG_ROUTING_NONE) { 317 if (entangled_id != MSG_ROUTING_NONE) {
308 // Do the disentanglement (and be paranoid about the other side existing 318 // Do the disentanglement (and be paranoid about the other side existing
309 // just in case something unusual happened during entanglement). 319 // just in case something unusual happened during entanglement).
310 if (message_ports_.count(entangled_id)) { 320 if (message_ports_.count(entangled_id)) {
311 message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE; 321 message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE;
312 } 322 }
313 } 323 }
314 message_ports_.erase(erase_item); 324 message_ports_.erase(erase_item);
315 } 325 }
316 326
317 } // namespace content 327 } // namespace content
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698