OLD | NEW |
1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "chrome/common/ipc_channel.h" | 5 #include "chrome/common/ipc_channel.h" |
6 | 6 |
7 #include <windows.h> | 7 #include <windows.h> |
8 #include <sstream> | 8 #include <sstream> |
9 | 9 |
10 #include "base/compiler_specific.h" | 10 #include "base/compiler_specific.h" |
11 #include "base/logging.h" | 11 #include "base/logging.h" |
12 #include "base/win_util.h" | 12 #include "base/win_util.h" |
13 #include "chrome/common/chrome_counters.h" | 13 #include "chrome/common/chrome_counters.h" |
14 #include "chrome/common/ipc_logging.h" | 14 #include "chrome/common/ipc_logging.h" |
15 #include "chrome/common/ipc_message_utils.h" | 15 #include "chrome/common/ipc_message_utils.h" |
16 | 16 |
17 using namespace std; | 17 using namespace std; |
18 | 18 |
19 namespace IPC { | 19 namespace IPC { |
20 | 20 |
21 //------------------------------------------------------------------------------ | 21 //------------------------------------------------------------------------------ |
22 | 22 |
23 Channel::State::State() | 23 Channel::State::State(Channel* channel) : is_pending(false) { |
24 : is_pending(false) { | 24 memset(&context.overlapped, 0, sizeof(context.overlapped)); |
25 memset(&overlapped, 0, sizeof(overlapped)); | 25 context.handler = channel; |
26 overlapped.hEvent = CreateEvent(NULL, // default security attributes | |
27 TRUE, // manual-reset event | |
28 TRUE, // initial state = signaled | |
29 NULL); // unnamed event object | |
30 } | 26 } |
31 | 27 |
32 Channel::State::~State() { | 28 Channel::State::~State() { |
33 if (overlapped.hEvent) | 29 COMPILE_ASSERT(!offsetof(Channel::State, context), starts_with_io_context); |
34 CloseHandle(overlapped.hEvent); | |
35 } | 30 } |
36 | 31 |
37 //------------------------------------------------------------------------------ | 32 //------------------------------------------------------------------------------ |
38 | 33 |
39 Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener) | 34 Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener) |
40 : pipe_(INVALID_HANDLE_VALUE), | 35 : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)), |
| 36 ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)), |
| 37 pipe_(INVALID_HANDLE_VALUE), |
41 listener_(listener), | 38 listener_(listener), |
42 waiting_connect_(mode == MODE_SERVER), | 39 waiting_connect_(mode == MODE_SERVER), |
43 processing_incoming_(false), | 40 processing_incoming_(false), |
44 ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) { | 41 ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) { |
45 if (!CreatePipe(channel_id, mode)) { | 42 if (!CreatePipe(channel_id, mode)) { |
46 // The pipe may have been closed already. | 43 // The pipe may have been closed already. |
47 LOG(WARNING) << "Unable to create pipe named \"" << channel_id << | 44 LOG(WARNING) << "Unable to create pipe named \"" << channel_id << |
48 "\" in " << (mode == 0 ? "server" : "client") << " mode."; | 45 "\" in " << (mode == 0 ? "server" : "client") << " mode."; |
49 } | 46 } |
50 } | 47 } |
51 | 48 |
52 void Channel::Close() { | 49 void Channel::Close() { |
53 // make sure we are no longer watching the pipe events | 50 bool waited = false; |
54 MessageLoopForIO* loop = MessageLoopForIO::current(); | 51 if (input_state_.is_pending || output_state_.is_pending) { |
55 if (input_state_.is_pending) { | 52 CancelIo(pipe_); |
56 input_state_.is_pending = false; | 53 waited = true; |
57 loop->RegisterIOContext(&input_state_.overlapped, NULL); | |
58 } | 54 } |
59 | 55 |
60 if (output_state_.is_pending) { | 56 // Closing the handle at this point prevents us from issuing more requests |
61 output_state_.is_pending = false; | 57 // form OnIOCompleted(). |
62 loop->RegisterIOContext(&output_state_.overlapped, NULL); | |
63 } | |
64 | |
65 if (pipe_ != INVALID_HANDLE_VALUE) { | 58 if (pipe_ != INVALID_HANDLE_VALUE) { |
66 CloseHandle(pipe_); | 59 CloseHandle(pipe_); |
67 pipe_ = INVALID_HANDLE_VALUE; | 60 pipe_ = INVALID_HANDLE_VALUE; |
68 } | 61 } |
69 | 62 |
| 63 // Make sure all IO has completed. |
| 64 base::Time start = base::Time::Now(); |
| 65 while (input_state_.is_pending || output_state_.is_pending) { |
| 66 MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); |
| 67 } |
| 68 if (waited) { |
| 69 // We want to see if we block the message loop for too long. |
| 70 UMA_HISTOGRAM_TIMES(L"AsyncIO.IPCChannelClose", base::Time::Now() - start); |
| 71 } |
| 72 |
70 while (!output_queue_.empty()) { | 73 while (!output_queue_.empty()) { |
71 Message* m = output_queue_.front(); | 74 Message* m = output_queue_.front(); |
72 output_queue_.pop(); | 75 output_queue_.pop(); |
73 delete m; | 76 delete m; |
74 } | 77 } |
75 } | 78 } |
76 | 79 |
77 bool Channel::Send(Message* message) { | 80 bool Channel::Send(Message* message) { |
78 chrome::Counters::ipc_send_counter().Increment(); | 81 chrome::Counters::ipc_send_counter().Increment(); |
79 #ifdef IPC_MESSAGE_DEBUG_EXTRA | 82 #ifdef IPC_MESSAGE_DEBUG_EXTRA |
(...skipping 88 matching lines...) Loading... |
168 | 171 |
169 // Check to see if there is a client connected to our pipe... | 172 // Check to see if there is a client connected to our pipe... |
170 if (waiting_connect_) | 173 if (waiting_connect_) |
171 ProcessConnection(); | 174 ProcessConnection(); |
172 | 175 |
173 if (!input_state_.is_pending) { | 176 if (!input_state_.is_pending) { |
174 // Complete setup asynchronously. By not setting input_state_.is_pending | 177 // Complete setup asynchronously. By not setting input_state_.is_pending |
175 // to true, we indicate to OnIOCompleted that this is the special | 178 // to true, we indicate to OnIOCompleted that this is the special |
176 // initialization signal. | 179 // initialization signal. |
177 MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod( | 180 MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod( |
178 &Channel::OnIOCompleted, &input_state_.overlapped, 0, 0)); | 181 &Channel::OnIOCompleted, &input_state_.context, 0, 0)); |
179 } | 182 } |
180 | 183 |
181 if (!waiting_connect_) | 184 if (!waiting_connect_) |
182 ProcessOutgoingMessages(NULL, 0); | 185 ProcessOutgoingMessages(NULL, 0); |
183 return true; | 186 return true; |
184 } | 187 } |
185 | 188 |
186 bool Channel::ProcessConnection() { | 189 bool Channel::ProcessConnection() { |
187 if (input_state_.is_pending) { | 190 if (input_state_.is_pending) |
188 input_state_.is_pending = false; | 191 input_state_.is_pending = false; |
189 MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped, | |
190 NULL); | |
191 } | |
192 | 192 |
193 // Do we have a client connected to our pipe? | 193 // Do we have a client connected to our pipe? |
194 DCHECK(pipe_ != INVALID_HANDLE_VALUE); | 194 if (INVALID_HANDLE_VALUE == pipe_) |
195 BOOL ok = ConnectNamedPipe(pipe_, &input_state_.overlapped); | 195 return false; |
| 196 |
| 197 BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped); |
196 | 198 |
197 DWORD err = GetLastError(); | 199 DWORD err = GetLastError(); |
198 if (ok) { | 200 if (ok) { |
199 // Uhm, the API documentation says that this function should never | 201 // Uhm, the API documentation says that this function should never |
200 // return success when used in overlapped mode. | 202 // return success when used in overlapped mode. |
201 NOTREACHED(); | 203 NOTREACHED(); |
202 return false; | 204 return false; |
203 } | 205 } |
204 | 206 |
205 switch (err) { | 207 switch (err) { |
206 case ERROR_IO_PENDING: | 208 case ERROR_IO_PENDING: |
207 input_state_.is_pending = true; | 209 input_state_.is_pending = true; |
208 MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped, | |
209 this); | |
210 break; | 210 break; |
211 case ERROR_PIPE_CONNECTED: | 211 case ERROR_PIPE_CONNECTED: |
212 waiting_connect_ = false; | 212 waiting_connect_ = false; |
213 break; | 213 break; |
214 default: | 214 default: |
215 NOTREACHED(); | 215 NOTREACHED(); |
216 return false; | 216 return false; |
217 } | 217 } |
218 | 218 |
219 return true; | 219 return true; |
220 } | 220 } |
221 | 221 |
222 bool Channel::ProcessIncomingMessages(OVERLAPPED* context, | 222 bool Channel::ProcessIncomingMessages(MessageLoopForIO::IOContext* context, |
223 DWORD bytes_read) { | 223 DWORD bytes_read) { |
224 if (input_state_.is_pending) { | 224 if (input_state_.is_pending) { |
225 input_state_.is_pending = false; | 225 input_state_.is_pending = false; |
226 DCHECK(context); | 226 DCHECK(context); |
227 MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped, | |
228 NULL); | |
229 | 227 |
230 if (!context || !bytes_read) | 228 if (!context || !bytes_read) |
231 return false; | 229 return false; |
232 } else { | 230 } else { |
233 // This happens at channel initialization. | 231 // This happens at channel initialization. |
234 DCHECK(!bytes_read && context == &input_state_.overlapped); | 232 DCHECK(!bytes_read && context == &input_state_.context); |
235 } | 233 } |
236 | 234 |
237 for (;;) { | 235 for (;;) { |
238 if (bytes_read == 0) { | 236 if (bytes_read == 0) { |
| 237 if (INVALID_HANDLE_VALUE == pipe_) |
| 238 return false; |
| 239 |
239 // Read from pipe... | 240 // Read from pipe... |
240 BOOL ok = ReadFile(pipe_, | 241 BOOL ok = ReadFile(pipe_, |
241 input_buf_, | 242 input_buf_, |
242 BUF_SIZE, | 243 BUF_SIZE, |
243 &bytes_read, | 244 &bytes_read, |
244 &input_state_.overlapped); | 245 &input_state_.context.overlapped); |
245 if (!ok) { | 246 if (!ok) { |
246 DWORD err = GetLastError(); | 247 DWORD err = GetLastError(); |
247 if (err == ERROR_IO_PENDING) { | 248 if (err == ERROR_IO_PENDING) { |
248 MessageLoopForIO::current()->RegisterIOContext( | |
249 &input_state_.overlapped, this); | |
250 input_state_.is_pending = true; | 249 input_state_.is_pending = true; |
251 return true; | 250 return true; |
252 } | 251 } |
253 LOG(ERROR) << "pipe error: " << err; | 252 LOG(ERROR) << "pipe error: " << err; |
254 return false; | 253 return false; |
255 } | 254 } |
| 255 input_state_.is_pending = true; |
| 256 return true; |
256 } | 257 } |
257 DCHECK(bytes_read); | 258 DCHECK(bytes_read); |
258 | 259 |
259 // Process messages from input buffer. | 260 // Process messages from input buffer. |
260 | 261 |
261 const char* p, *end; | 262 const char* p, *end; |
262 if (input_overflow_buf_.empty()) { | 263 if (input_overflow_buf_.empty()) { |
263 p = input_buf_; | 264 p = input_buf_; |
264 end = p + bytes_read; | 265 end = p + bytes_read; |
265 } else { | 266 } else { |
(...skipping 30 matching lines...) Loading... |
296 } | 297 } |
297 } | 298 } |
298 input_overflow_buf_.assign(p, end - p); | 299 input_overflow_buf_.assign(p, end - p); |
299 | 300 |
300 bytes_read = 0; // Get more data. | 301 bytes_read = 0; // Get more data. |
301 } | 302 } |
302 | 303 |
303 return true; | 304 return true; |
304 } | 305 } |
305 | 306 |
306 bool Channel::ProcessOutgoingMessages(OVERLAPPED* context, | 307 bool Channel::ProcessOutgoingMessages(MessageLoopForIO::IOContext* context, |
307 DWORD bytes_written) { | 308 DWORD bytes_written) { |
308 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's | 309 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's |
309 // no connection? | 310 // no connection? |
310 | 311 |
311 if (output_state_.is_pending) { | 312 if (output_state_.is_pending) { |
312 DCHECK(context); | 313 DCHECK(context); |
313 MessageLoopForIO::current()->RegisterIOContext(&output_state_.overlapped, | |
314 NULL); | |
315 output_state_.is_pending = false; | 314 output_state_.is_pending = false; |
316 if (!context || bytes_written == 0) { | 315 if (!context || bytes_written == 0) { |
317 DWORD err = GetLastError(); | 316 DWORD err = GetLastError(); |
318 LOG(ERROR) << "pipe error: " << err; | 317 LOG(ERROR) << "pipe error: " << err; |
319 return false; | 318 return false; |
320 } | 319 } |
321 // Message was sent. | 320 // Message was sent. |
322 DCHECK(!output_queue_.empty()); | 321 DCHECK(!output_queue_.empty()); |
323 Message* m = output_queue_.front(); | 322 Message* m = output_queue_.front(); |
324 output_queue_.pop(); | 323 output_queue_.pop(); |
325 delete m; | 324 delete m; |
326 } | 325 } |
327 | 326 |
328 while (!output_queue_.empty()) { | 327 if (output_queue_.empty()) |
329 // Write to pipe... | 328 return true; |
330 Message* m = output_queue_.front(); | 329 |
331 BOOL ok = WriteFile(pipe_, | 330 if (INVALID_HANDLE_VALUE == pipe_) |
332 m->data(), | 331 return false; |
333 m->size(), | 332 |
334 &bytes_written, | 333 // Write to pipe... |
335 &output_state_.overlapped); | 334 Message* m = output_queue_.front(); |
336 if (!ok) { | 335 BOOL ok = WriteFile(pipe_, |
337 DWORD err = GetLastError(); | 336 m->data(), |
338 if (err == ERROR_IO_PENDING) { | 337 m->size(), |
339 MessageLoopForIO::current()->RegisterIOContext( | 338 &bytes_written, |
340 &output_state_.overlapped, this); | 339 &output_state_.context.overlapped); |
341 output_state_.is_pending = true; | 340 if (!ok) { |
| 341 DWORD err = GetLastError(); |
| 342 if (err == ERROR_IO_PENDING) { |
| 343 output_state_.is_pending = true; |
342 | 344 |
343 #ifdef IPC_MESSAGE_DEBUG_EXTRA | 345 #ifdef IPC_MESSAGE_DEBUG_EXTRA |
344 DLOG(INFO) << "sent pending message @" << m << " on channel @" << | 346 DLOG(INFO) << "sent pending message @" << m << " on channel @" << |
345 this << " with type " << m->type(); | 347 this << " with type " << m->type(); |
346 #endif | 348 #endif |
347 | 349 |
348 return true; | 350 return true; |
349 } | |
350 LOG(ERROR) << "pipe error: " << err; | |
351 return false; | |
352 } | 351 } |
353 DCHECK(bytes_written == m->size()); | 352 LOG(ERROR) << "pipe error: " << err; |
354 output_queue_.pop(); | 353 return false; |
| 354 } |
355 | 355 |
356 #ifdef IPC_MESSAGE_DEBUG_EXTRA | 356 #ifdef IPC_MESSAGE_DEBUG_EXTRA |
357 DLOG(INFO) << "sent message @" << m << " on channel @" << this << | 357 DLOG(INFO) << "sent message @" << m << " on channel @" << this << |
358 " with type " << m->type(); | 358 " with type " << m->type(); |
359 #endif | 359 #endif |
360 | 360 |
361 delete m; | 361 output_state_.is_pending = true; |
362 } | |
363 | |
364 return true; | 362 return true; |
365 } | 363 } |
366 | 364 |
367 bool Channel::ProcessPendingMessages(DWORD max_wait_msec) { | 365 bool Channel::ProcessPendingMessages(DWORD max_wait_msec) { |
368 return false; | 366 return false; |
369 // TODO(darin): this code is broken and leads to busy waiting | 367 // TODO(darin): this code is broken and leads to busy waiting |
370 #if 0 | 368 #if 0 |
371 DCHECK(max_wait_msec <= 0x7FFFFFFF || max_wait_msec == INFINITE); | 369 DCHECK(max_wait_msec <= 0x7FFFFFFF || max_wait_msec == INFINITE); |
372 | 370 |
373 HANDLE events[] = { | 371 HANDLE events[] = { |
(...skipping 19 matching lines...) Loading... |
393 DLOG(WARNING) << "Would recurse into ProcessIncomingMessages"; | 391 DLOG(WARNING) << "Would recurse into ProcessIncomingMessages"; |
394 } else { | 392 } else { |
395 OnObjectSignaled(events[i]); | 393 OnObjectSignaled(events[i]); |
396 } | 394 } |
397 } | 395 } |
398 } | 396 } |
399 return rv; | 397 return rv; |
400 #endif | 398 #endif |
401 } | 399 } |
402 | 400 |
403 void Channel::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, | 401 void Channel::OnIOCompleted(MessageLoopForIO::IOContext* context, |
404 DWORD error) { | 402 DWORD bytes_transfered, DWORD error) { |
405 bool ok; | 403 bool ok; |
406 if (context == &input_state_.overlapped) { | 404 if (context == &input_state_.context) { |
407 if (waiting_connect_) { | 405 if (waiting_connect_) { |
408 ProcessConnection(); | 406 if (!ProcessConnection()) |
| 407 return; |
409 // We may have some messages queued up to send... | 408 // We may have some messages queued up to send... |
410 if (!output_queue_.empty() && !output_state_.is_pending) | 409 if (!output_queue_.empty() && !output_state_.is_pending) |
411 ProcessOutgoingMessages(NULL, 0); | 410 ProcessOutgoingMessages(NULL, 0); |
412 if (input_state_.is_pending) | 411 if (input_state_.is_pending) |
413 return; | 412 return; |
414 // else, fall-through and look for incoming messages... | 413 // else, fall-through and look for incoming messages... |
415 } | 414 } |
416 // we don't support recursion through OnMessageReceived yet! | 415 // we don't support recursion through OnMessageReceived yet! |
417 DCHECK(!processing_incoming_); | 416 DCHECK(!processing_incoming_); |
418 processing_incoming_ = true; | 417 processing_incoming_ = true; |
419 ok = ProcessIncomingMessages(context, bytes_transfered); | 418 ok = ProcessIncomingMessages(context, bytes_transfered); |
420 processing_incoming_ = false; | 419 processing_incoming_ = false; |
421 } else { | 420 } else { |
422 DCHECK(context == &output_state_.overlapped); | 421 DCHECK(context == &output_state_.context); |
423 ok = ProcessOutgoingMessages(context, bytes_transfered); | 422 ok = ProcessOutgoingMessages(context, bytes_transfered); |
424 } | 423 } |
425 if (!ok) { | 424 if (!ok && INVALID_HANDLE_VALUE != pipe_) { |
| 425 // We don't want to re-enter Close(). |
426 Close(); | 426 Close(); |
427 listener_->OnChannelError(); | 427 listener_->OnChannelError(); |
428 } | 428 } |
429 } | 429 } |
430 | 430 |
431 } | 431 } |
OLD | NEW |