Chromium Code Reviews

Side by Side Diff: chrome/common/ipc_channel.cc

Issue 8156: Switch MessagePumpForIO to use completion ports on Windows.... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 12 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View unified diff | | Annotate | Revision Log
« no previous file with comments | « chrome/common/ipc_channel.h ('k') | chrome/common/ipc_sync_channel_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/common/ipc_channel.h" 5 #include "chrome/common/ipc_channel.h"
6 6
7 #include <windows.h> 7 #include <windows.h>
8 #include <sstream> 8 #include <sstream>
9 9
10 #include "base/compiler_specific.h" 10 #include "base/compiler_specific.h"
11 #include "base/logging.h" 11 #include "base/logging.h"
12 #include "base/win_util.h" 12 #include "base/win_util.h"
13 #include "chrome/common/chrome_counters.h" 13 #include "chrome/common/chrome_counters.h"
14 #include "chrome/common/ipc_logging.h" 14 #include "chrome/common/ipc_logging.h"
15 #include "chrome/common/ipc_message_utils.h" 15 #include "chrome/common/ipc_message_utils.h"
16 16
17 using namespace std; 17 using namespace std;
18 18
19 namespace IPC { 19 namespace IPC {
20 20
21 //------------------------------------------------------------------------------ 21 //------------------------------------------------------------------------------
22 22
23 Channel::State::State() 23 Channel::State::State(Channel* channel) : is_pending(false) {
24 : is_pending(false) { 24 memset(&context.overlapped, 0, sizeof(context.overlapped));
25 memset(&overlapped, 0, sizeof(overlapped)); 25 context.handler = channel;
26 overlapped.hEvent = CreateEvent(NULL, // default security attributes
27 TRUE, // manual-reset event
28 TRUE, // initial state = signaled
29 NULL); // unnamed event object
30 } 26 }
31 27
32 Channel::State::~State() { 28 Channel::State::~State() {
33 if (overlapped.hEvent) 29 COMPILE_ASSERT(!offsetof(Channel::State, context), starts_with_io_context);
34 CloseHandle(overlapped.hEvent);
35 } 30 }
36 31
37 //------------------------------------------------------------------------------ 32 //------------------------------------------------------------------------------
38 33
39 Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener) 34 Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener)
40 : pipe_(INVALID_HANDLE_VALUE), 35 : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)),
36 ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)),
37 pipe_(INVALID_HANDLE_VALUE),
41 listener_(listener), 38 listener_(listener),
42 waiting_connect_(mode == MODE_SERVER), 39 waiting_connect_(mode == MODE_SERVER),
43 processing_incoming_(false), 40 processing_incoming_(false),
44 ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) { 41 ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) {
45 if (!CreatePipe(channel_id, mode)) { 42 if (!CreatePipe(channel_id, mode)) {
46 // The pipe may have been closed already. 43 // The pipe may have been closed already.
47 LOG(WARNING) << "Unable to create pipe named \"" << channel_id << 44 LOG(WARNING) << "Unable to create pipe named \"" << channel_id <<
48 "\" in " << (mode == 0 ? "server" : "client") << " mode."; 45 "\" in " << (mode == 0 ? "server" : "client") << " mode.";
49 } 46 }
50 } 47 }
51 48
52 void Channel::Close() { 49 void Channel::Close() {
53 // make sure we are no longer watching the pipe events 50 bool waited = false;
54 MessageLoopForIO* loop = MessageLoopForIO::current(); 51 if (input_state_.is_pending || output_state_.is_pending) {
55 if (input_state_.is_pending) { 52 CancelIo(pipe_);
56 input_state_.is_pending = false; 53 waited = true;
57 loop->RegisterIOContext(&input_state_.overlapped, NULL);
58 } 54 }
59 55
60 if (output_state_.is_pending) { 56 // Closing the handle at this point prevents us from issuing more requests
61 output_state_.is_pending = false; 57 // form OnIOCompleted().
62 loop->RegisterIOContext(&output_state_.overlapped, NULL);
63 }
64
65 if (pipe_ != INVALID_HANDLE_VALUE) { 58 if (pipe_ != INVALID_HANDLE_VALUE) {
66 CloseHandle(pipe_); 59 CloseHandle(pipe_);
67 pipe_ = INVALID_HANDLE_VALUE; 60 pipe_ = INVALID_HANDLE_VALUE;
68 } 61 }
69 62
63 // Make sure all IO has completed.
64 base::Time start = base::Time::Now();
65 while (input_state_.is_pending || output_state_.is_pending) {
66 MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this);
67 }
68 if (waited) {
69 // We want to see if we block the message loop for too long.
70 UMA_HISTOGRAM_TIMES(L"AsyncIO.IPCChannelClose", base::Time::Now() - start);
71 }
72
70 while (!output_queue_.empty()) { 73 while (!output_queue_.empty()) {
71 Message* m = output_queue_.front(); 74 Message* m = output_queue_.front();
72 output_queue_.pop(); 75 output_queue_.pop();
73 delete m; 76 delete m;
74 } 77 }
75 } 78 }
76 79
77 bool Channel::Send(Message* message) { 80 bool Channel::Send(Message* message) {
78 chrome::Counters::ipc_send_counter().Increment(); 81 chrome::Counters::ipc_send_counter().Increment();
79 #ifdef IPC_MESSAGE_DEBUG_EXTRA 82 #ifdef IPC_MESSAGE_DEBUG_EXTRA
(...skipping 88 matching lines...)
168 171
169 // Check to see if there is a client connected to our pipe... 172 // Check to see if there is a client connected to our pipe...
170 if (waiting_connect_) 173 if (waiting_connect_)
171 ProcessConnection(); 174 ProcessConnection();
172 175
173 if (!input_state_.is_pending) { 176 if (!input_state_.is_pending) {
174 // Complete setup asynchronously. By not setting input_state_.is_pending 177 // Complete setup asynchronously. By not setting input_state_.is_pending
175 // to true, we indicate to OnIOCompleted that this is the special 178 // to true, we indicate to OnIOCompleted that this is the special
176 // initialization signal. 179 // initialization signal.
177 MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod( 180 MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod(
178 &Channel::OnIOCompleted, &input_state_.overlapped, 0, 0)); 181 &Channel::OnIOCompleted, &input_state_.context, 0, 0));
179 } 182 }
180 183
181 if (!waiting_connect_) 184 if (!waiting_connect_)
182 ProcessOutgoingMessages(NULL, 0); 185 ProcessOutgoingMessages(NULL, 0);
183 return true; 186 return true;
184 } 187 }
185 188
186 bool Channel::ProcessConnection() { 189 bool Channel::ProcessConnection() {
187 if (input_state_.is_pending) { 190 if (input_state_.is_pending)
188 input_state_.is_pending = false; 191 input_state_.is_pending = false;
189 MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped,
190 NULL);
191 }
192 192
193 // Do we have a client connected to our pipe? 193 // Do we have a client connected to our pipe?
194 DCHECK(pipe_ != INVALID_HANDLE_VALUE); 194 if (INVALID_HANDLE_VALUE == pipe_)
195 BOOL ok = ConnectNamedPipe(pipe_, &input_state_.overlapped); 195 return false;
196
197 BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped);
196 198
197 DWORD err = GetLastError(); 199 DWORD err = GetLastError();
198 if (ok) { 200 if (ok) {
199 // Uhm, the API documentation says that this function should never 201 // Uhm, the API documentation says that this function should never
200 // return success when used in overlapped mode. 202 // return success when used in overlapped mode.
201 NOTREACHED(); 203 NOTREACHED();
202 return false; 204 return false;
203 } 205 }
204 206
205 switch (err) { 207 switch (err) {
206 case ERROR_IO_PENDING: 208 case ERROR_IO_PENDING:
207 input_state_.is_pending = true; 209 input_state_.is_pending = true;
208 MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped,
209 this);
210 break; 210 break;
211 case ERROR_PIPE_CONNECTED: 211 case ERROR_PIPE_CONNECTED:
212 waiting_connect_ = false; 212 waiting_connect_ = false;
213 break; 213 break;
214 default: 214 default:
215 NOTREACHED(); 215 NOTREACHED();
216 return false; 216 return false;
217 } 217 }
218 218
219 return true; 219 return true;
220 } 220 }
221 221
222 bool Channel::ProcessIncomingMessages(OVERLAPPED* context, 222 bool Channel::ProcessIncomingMessages(MessageLoopForIO::IOContext* context,
223 DWORD bytes_read) { 223 DWORD bytes_read) {
224 if (input_state_.is_pending) { 224 if (input_state_.is_pending) {
225 input_state_.is_pending = false; 225 input_state_.is_pending = false;
226 DCHECK(context); 226 DCHECK(context);
227 MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped,
228 NULL);
229 227
230 if (!context || !bytes_read) 228 if (!context || !bytes_read)
231 return false; 229 return false;
232 } else { 230 } else {
233 // This happens at channel initialization. 231 // This happens at channel initialization.
234 DCHECK(!bytes_read && context == &input_state_.overlapped); 232 DCHECK(!bytes_read && context == &input_state_.context);
235 } 233 }
236 234
237 for (;;) { 235 for (;;) {
238 if (bytes_read == 0) { 236 if (bytes_read == 0) {
237 if (INVALID_HANDLE_VALUE == pipe_)
238 return false;
239
239 // Read from pipe... 240 // Read from pipe...
240 BOOL ok = ReadFile(pipe_, 241 BOOL ok = ReadFile(pipe_,
241 input_buf_, 242 input_buf_,
242 BUF_SIZE, 243 BUF_SIZE,
243 &bytes_read, 244 &bytes_read,
244 &input_state_.overlapped); 245 &input_state_.context.overlapped);
245 if (!ok) { 246 if (!ok) {
246 DWORD err = GetLastError(); 247 DWORD err = GetLastError();
247 if (err == ERROR_IO_PENDING) { 248 if (err == ERROR_IO_PENDING) {
248 MessageLoopForIO::current()->RegisterIOContext(
249 &input_state_.overlapped, this);
250 input_state_.is_pending = true; 249 input_state_.is_pending = true;
251 return true; 250 return true;
252 } 251 }
253 LOG(ERROR) << "pipe error: " << err; 252 LOG(ERROR) << "pipe error: " << err;
254 return false; 253 return false;
255 } 254 }
255 input_state_.is_pending = true;
256 return true;
256 } 257 }
257 DCHECK(bytes_read); 258 DCHECK(bytes_read);
258 259
259 // Process messages from input buffer. 260 // Process messages from input buffer.
260 261
261 const char* p, *end; 262 const char* p, *end;
262 if (input_overflow_buf_.empty()) { 263 if (input_overflow_buf_.empty()) {
263 p = input_buf_; 264 p = input_buf_;
264 end = p + bytes_read; 265 end = p + bytes_read;
265 } else { 266 } else {
(...skipping 30 matching lines...)
296 } 297 }
297 } 298 }
298 input_overflow_buf_.assign(p, end - p); 299 input_overflow_buf_.assign(p, end - p);
299 300
300 bytes_read = 0; // Get more data. 301 bytes_read = 0; // Get more data.
301 } 302 }
302 303
303 return true; 304 return true;
304 } 305 }
305 306
306 bool Channel::ProcessOutgoingMessages(OVERLAPPED* context, 307 bool Channel::ProcessOutgoingMessages(MessageLoopForIO::IOContext* context,
307 DWORD bytes_written) { 308 DWORD bytes_written) {
308 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's 309 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's
309 // no connection? 310 // no connection?
310 311
311 if (output_state_.is_pending) { 312 if (output_state_.is_pending) {
312 DCHECK(context); 313 DCHECK(context);
313 MessageLoopForIO::current()->RegisterIOContext(&output_state_.overlapped,
314 NULL);
315 output_state_.is_pending = false; 314 output_state_.is_pending = false;
316 if (!context || bytes_written == 0) { 315 if (!context || bytes_written == 0) {
317 DWORD err = GetLastError(); 316 DWORD err = GetLastError();
318 LOG(ERROR) << "pipe error: " << err; 317 LOG(ERROR) << "pipe error: " << err;
319 return false; 318 return false;
320 } 319 }
321 // Message was sent. 320 // Message was sent.
322 DCHECK(!output_queue_.empty()); 321 DCHECK(!output_queue_.empty());
323 Message* m = output_queue_.front(); 322 Message* m = output_queue_.front();
324 output_queue_.pop(); 323 output_queue_.pop();
325 delete m; 324 delete m;
326 } 325 }
327 326
328 while (!output_queue_.empty()) { 327 if (output_queue_.empty())
329 // Write to pipe... 328 return true;
330 Message* m = output_queue_.front(); 329
331 BOOL ok = WriteFile(pipe_, 330 if (INVALID_HANDLE_VALUE == pipe_)
332 m->data(), 331 return false;
333 m->size(), 332
334 &bytes_written, 333 // Write to pipe...
335 &output_state_.overlapped); 334 Message* m = output_queue_.front();
336 if (!ok) { 335 BOOL ok = WriteFile(pipe_,
337 DWORD err = GetLastError(); 336 m->data(),
338 if (err == ERROR_IO_PENDING) { 337 m->size(),
339 MessageLoopForIO::current()->RegisterIOContext( 338 &bytes_written,
340 &output_state_.overlapped, this); 339 &output_state_.context.overlapped);
341 output_state_.is_pending = true; 340 if (!ok) {
341 DWORD err = GetLastError();
342 if (err == ERROR_IO_PENDING) {
343 output_state_.is_pending = true;
342 344
343 #ifdef IPC_MESSAGE_DEBUG_EXTRA 345 #ifdef IPC_MESSAGE_DEBUG_EXTRA
344 DLOG(INFO) << "sent pending message @" << m << " on channel @" << 346 DLOG(INFO) << "sent pending message @" << m << " on channel @" <<
345 this << " with type " << m->type(); 347 this << " with type " << m->type();
346 #endif 348 #endif
347 349
348 return true; 350 return true;
349 }
350 LOG(ERROR) << "pipe error: " << err;
351 return false;
352 } 351 }
353 DCHECK(bytes_written == m->size()); 352 LOG(ERROR) << "pipe error: " << err;
354 output_queue_.pop(); 353 return false;
354 }
355 355
356 #ifdef IPC_MESSAGE_DEBUG_EXTRA 356 #ifdef IPC_MESSAGE_DEBUG_EXTRA
357 DLOG(INFO) << "sent message @" << m << " on channel @" << this << 357 DLOG(INFO) << "sent message @" << m << " on channel @" << this <<
358 " with type " << m->type(); 358 " with type " << m->type();
359 #endif 359 #endif
360 360
361 delete m; 361 output_state_.is_pending = true;
362 }
363
364 return true; 362 return true;
365 } 363 }
366 364
367 bool Channel::ProcessPendingMessages(DWORD max_wait_msec) { 365 bool Channel::ProcessPendingMessages(DWORD max_wait_msec) {
368 return false; 366 return false;
369 // TODO(darin): this code is broken and leads to busy waiting 367 // TODO(darin): this code is broken and leads to busy waiting
370 #if 0 368 #if 0
371 DCHECK(max_wait_msec <= 0x7FFFFFFF || max_wait_msec == INFINITE); 369 DCHECK(max_wait_msec <= 0x7FFFFFFF || max_wait_msec == INFINITE);
372 370
373 HANDLE events[] = { 371 HANDLE events[] = {
(...skipping 19 matching lines...)
393 DLOG(WARNING) << "Would recurse into ProcessIncomingMessages"; 391 DLOG(WARNING) << "Would recurse into ProcessIncomingMessages";
394 } else { 392 } else {
395 OnObjectSignaled(events[i]); 393 OnObjectSignaled(events[i]);
396 } 394 }
397 } 395 }
398 } 396 }
399 return rv; 397 return rv;
400 #endif 398 #endif
401 } 399 }
402 400
403 void Channel::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, 401 void Channel::OnIOCompleted(MessageLoopForIO::IOContext* context,
404 DWORD error) { 402 DWORD bytes_transfered, DWORD error) {
405 bool ok; 403 bool ok;
406 if (context == &input_state_.overlapped) { 404 if (context == &input_state_.context) {
407 if (waiting_connect_) { 405 if (waiting_connect_) {
408 ProcessConnection(); 406 if (!ProcessConnection())
407 return;
409 // We may have some messages queued up to send... 408 // We may have some messages queued up to send...
410 if (!output_queue_.empty() && !output_state_.is_pending) 409 if (!output_queue_.empty() && !output_state_.is_pending)
411 ProcessOutgoingMessages(NULL, 0); 410 ProcessOutgoingMessages(NULL, 0);
412 if (input_state_.is_pending) 411 if (input_state_.is_pending)
413 return; 412 return;
414 // else, fall-through and look for incoming messages... 413 // else, fall-through and look for incoming messages...
415 } 414 }
416 // we don't support recursion through OnMessageReceived yet! 415 // we don't support recursion through OnMessageReceived yet!
417 DCHECK(!processing_incoming_); 416 DCHECK(!processing_incoming_);
418 processing_incoming_ = true; 417 processing_incoming_ = true;
419 ok = ProcessIncomingMessages(context, bytes_transfered); 418 ok = ProcessIncomingMessages(context, bytes_transfered);
420 processing_incoming_ = false; 419 processing_incoming_ = false;
421 } else { 420 } else {
422 DCHECK(context == &output_state_.overlapped); 421 DCHECK(context == &output_state_.context);
423 ok = ProcessOutgoingMessages(context, bytes_transfered); 422 ok = ProcessOutgoingMessages(context, bytes_transfered);
424 } 423 }
425 if (!ok) { 424 if (!ok && INVALID_HANDLE_VALUE != pipe_) {
425 // We don't want to re-enter Close().
426 Close(); 426 Close();
427 listener_->OnChannelError(); 427 listener_->OnChannelError();
428 } 428 }
429 } 429 }
430 430
431 } 431 }
OLDNEW
« no previous file with comments | « chrome/common/ipc_channel.h ('k') | chrome/common/ipc_sync_channel_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine