Index: chrome/common/ipc_channel.cc |
=================================================================== |
--- chrome/common/ipc_channel.cc (revision 4870) |
+++ chrome/common/ipc_channel.cc (working copy) |
@@ -20,24 +20,21 @@ |
//------------------------------------------------------------------------------ |
-Channel::State::State() |
- : is_pending(false) { |
- memset(&overlapped, 0, sizeof(overlapped)); |
- overlapped.hEvent = CreateEvent(NULL, // default security attributes |
- TRUE, // manual-reset event |
- TRUE, // initial state = signaled |
- NULL); // unnamed event object |
+Channel::State::State(Channel* channel) : is_pending(false) { |
+ memset(&context.overlapped, 0, sizeof(context.overlapped)); |
+ context.handler = channel; |
} |
Channel::State::~State() { |
- if (overlapped.hEvent) |
- CloseHandle(overlapped.hEvent); |
+ COMPILE_ASSERT(!offsetof(Channel::State, context), starts_with_io_context); |
} |
//------------------------------------------------------------------------------ |
Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener) |
- : pipe_(INVALID_HANDLE_VALUE), |
+ : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)), |
+ ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)), |
+ pipe_(INVALID_HANDLE_VALUE), |
listener_(listener), |
waiting_connect_(mode == MODE_SERVER), |
processing_incoming_(false), |
@@ -50,23 +47,29 @@ |
} |
void Channel::Close() { |
- // make sure we are no longer watching the pipe events |
- MessageLoopForIO* loop = MessageLoopForIO::current(); |
- if (input_state_.is_pending) { |
- input_state_.is_pending = false; |
- loop->RegisterIOContext(&input_state_.overlapped, NULL); |
+ bool waited = false; |
+ if (input_state_.is_pending || output_state_.is_pending) { |
+ CancelIo(pipe_); |
+ waited = true; |
} |
- if (output_state_.is_pending) { |
- output_state_.is_pending = false; |
- loop->RegisterIOContext(&output_state_.overlapped, NULL); |
- } |
- |
+ // Closing the handle at this point prevents us from issuing more requests |
+ // form OnIOCompleted(). |
if (pipe_ != INVALID_HANDLE_VALUE) { |
CloseHandle(pipe_); |
pipe_ = INVALID_HANDLE_VALUE; |
} |
+ // Make sure all IO has completed. |
+ base::Time start = base::Time::Now(); |
+ while (input_state_.is_pending || output_state_.is_pending) { |
+ MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); |
+ } |
+ if (waited) { |
+ // We want to see if we block the message loop for too long. |
+ UMA_HISTOGRAM_TIMES(L"AsyncIO.IPCChannelClose", base::Time::Now() - start); |
+ } |
+ |
while (!output_queue_.empty()) { |
Message* m = output_queue_.front(); |
output_queue_.pop(); |
@@ -175,7 +178,7 @@ |
// to true, we indicate to OnIOCompleted that this is the special |
// initialization signal. |
MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod( |
- &Channel::OnIOCompleted, &input_state_.overlapped, 0, 0)); |
+ &Channel::OnIOCompleted, &input_state_.context, 0, 0)); |
} |
if (!waiting_connect_) |
@@ -184,16 +187,15 @@ |
} |
bool Channel::ProcessConnection() { |
- if (input_state_.is_pending) { |
+ if (input_state_.is_pending) |
input_state_.is_pending = false; |
- MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped, |
- NULL); |
- } |
// Do we have a client connected to our pipe? |
- DCHECK(pipe_ != INVALID_HANDLE_VALUE); |
- BOOL ok = ConnectNamedPipe(pipe_, &input_state_.overlapped); |
+ if (INVALID_HANDLE_VALUE == pipe_) |
+ return false; |
+ BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped); |
+ |
DWORD err = GetLastError(); |
if (ok) { |
// Uhm, the API documentation says that this function should never |
@@ -205,8 +207,6 @@ |
switch (err) { |
case ERROR_IO_PENDING: |
input_state_.is_pending = true; |
- MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped, |
- this); |
break; |
case ERROR_PIPE_CONNECTED: |
waiting_connect_ = false; |
@@ -219,40 +219,41 @@ |
return true; |
} |
-bool Channel::ProcessIncomingMessages(OVERLAPPED* context, |
+bool Channel::ProcessIncomingMessages(MessageLoopForIO::IOContext* context, |
DWORD bytes_read) { |
if (input_state_.is_pending) { |
input_state_.is_pending = false; |
DCHECK(context); |
- MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped, |
- NULL); |
if (!context || !bytes_read) |
return false; |
} else { |
// This happens at channel initialization. |
- DCHECK(!bytes_read && context == &input_state_.overlapped); |
+ DCHECK(!bytes_read && context == &input_state_.context); |
} |
for (;;) { |
if (bytes_read == 0) { |
+ if (INVALID_HANDLE_VALUE == pipe_) |
+ return false; |
+ |
// Read from pipe... |
BOOL ok = ReadFile(pipe_, |
input_buf_, |
BUF_SIZE, |
&bytes_read, |
- &input_state_.overlapped); |
+ &input_state_.context.overlapped); |
if (!ok) { |
DWORD err = GetLastError(); |
if (err == ERROR_IO_PENDING) { |
- MessageLoopForIO::current()->RegisterIOContext( |
- &input_state_.overlapped, this); |
input_state_.is_pending = true; |
return true; |
} |
LOG(ERROR) << "pipe error: " << err; |
return false; |
} |
+ input_state_.is_pending = true; |
+ return true; |
} |
DCHECK(bytes_read); |
@@ -303,15 +304,13 @@ |
return true; |
} |
-bool Channel::ProcessOutgoingMessages(OVERLAPPED* context, |
+bool Channel::ProcessOutgoingMessages(MessageLoopForIO::IOContext* context, |
DWORD bytes_written) { |
DCHECK(!waiting_connect_); // Why are we trying to send messages if there's |
// no connection? |
if (output_state_.is_pending) { |
DCHECK(context); |
- MessageLoopForIO::current()->RegisterIOContext(&output_state_.overlapped, |
- NULL); |
output_state_.is_pending = false; |
if (!context || bytes_written == 0) { |
DWORD err = GetLastError(); |
@@ -325,42 +324,41 @@ |
delete m; |
} |
- while (!output_queue_.empty()) { |
- // Write to pipe... |
- Message* m = output_queue_.front(); |
- BOOL ok = WriteFile(pipe_, |
- m->data(), |
- m->size(), |
- &bytes_written, |
- &output_state_.overlapped); |
- if (!ok) { |
- DWORD err = GetLastError(); |
- if (err == ERROR_IO_PENDING) { |
- MessageLoopForIO::current()->RegisterIOContext( |
- &output_state_.overlapped, this); |
- output_state_.is_pending = true; |
+ if (output_queue_.empty()) |
+ return true; |
+ if (INVALID_HANDLE_VALUE == pipe_) |
+ return false; |
+ |
+ // Write to pipe... |
+ Message* m = output_queue_.front(); |
+ BOOL ok = WriteFile(pipe_, |
+ m->data(), |
+ m->size(), |
+ &bytes_written, |
+ &output_state_.context.overlapped); |
+ if (!ok) { |
+ DWORD err = GetLastError(); |
+ if (err == ERROR_IO_PENDING) { |
+ output_state_.is_pending = true; |
+ |
#ifdef IPC_MESSAGE_DEBUG_EXTRA |
- DLOG(INFO) << "sent pending message @" << m << " on channel @" << |
- this << " with type " << m->type(); |
+ DLOG(INFO) << "sent pending message @" << m << " on channel @" << |
+ this << " with type " << m->type(); |
#endif |
- return true; |
- } |
- LOG(ERROR) << "pipe error: " << err; |
- return false; |
+ return true; |
} |
- DCHECK(bytes_written == m->size()); |
- output_queue_.pop(); |
+ LOG(ERROR) << "pipe error: " << err; |
+ return false; |
+ } |
#ifdef IPC_MESSAGE_DEBUG_EXTRA |
- DLOG(INFO) << "sent message @" << m << " on channel @" << this << |
- " with type " << m->type(); |
+ DLOG(INFO) << "sent message @" << m << " on channel @" << this << |
+ " with type " << m->type(); |
#endif |
- delete m; |
- } |
- |
+ output_state_.is_pending = true; |
return true; |
} |
@@ -400,12 +398,13 @@ |
#endif |
} |
-void Channel::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, |
- DWORD error) { |
+void Channel::OnIOCompleted(MessageLoopForIO::IOContext* context, |
+ DWORD bytes_transfered, DWORD error) { |
bool ok; |
- if (context == &input_state_.overlapped) { |
+ if (context == &input_state_.context) { |
if (waiting_connect_) { |
- ProcessConnection(); |
+ if (!ProcessConnection()) |
+ return; |
// We may have some messages queued up to send... |
if (!output_queue_.empty() && !output_state_.is_pending) |
ProcessOutgoingMessages(NULL, 0); |
@@ -419,10 +418,11 @@ |
ok = ProcessIncomingMessages(context, bytes_transfered); |
processing_incoming_ = false; |
} else { |
- DCHECK(context == &output_state_.overlapped); |
+ DCHECK(context == &output_state_.context); |
ok = ProcessOutgoingMessages(context, bytes_transfered); |
} |
- if (!ok) { |
+ if (!ok && INVALID_HANDLE_VALUE != pipe_) { |
+ // We don't want to re-enter Close(). |
Close(); |
listener_->OnChannelError(); |
} |