Index: ipc/ipc_channel_win.cc |
diff --git a/ipc/ipc_channel_win.cc b/ipc/ipc_channel_win.cc |
index 7b4f174c65ca74074e7e7b8b0b7abe65484b38d0..7e441b65937b17b6dda8d5f839a2c9b19057ed3c 100644 |
--- a/ipc/ipc_channel_win.cc |
+++ b/ipc/ipc_channel_win.cc |
@@ -104,6 +104,109 @@ bool Channel::ChannelImpl::IsNamedServerInitialized( |
return GetLastError() == ERROR_SEM_TIMEOUT; |
} |
+ |
+Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData( |
+ char* buffer, |
+ int buffer_len, |
+ int* /* bytes_read */) { |
+ if (INVALID_HANDLE_VALUE == pipe_) |
+ return READ_FAILED; |
+ |
+ DWORD bytes_read = 0; |
+ BOOL ok = ReadFile(pipe_, input_buf_, Channel::kReadBufferSize, |
+ &bytes_read, &input_state_.context.overlapped); |
+ if (!ok) { |
+ DWORD err = GetLastError(); |
+ if (err == ERROR_IO_PENDING) { |
+ input_state_.is_pending = true; |
+ return READ_PENDING; |
+ } |
+ LOG(ERROR) << "pipe error: " << err; |
+ return READ_FAILED; |
+ } |
+ |
+ // We could return READ_SUCCEEDED here. But the way that this code is |
+ // structured we instead go back to the message loop. Our completion port |
+ // will be signalled even in the "synchronously completed" state. |
+ // |
+ // This allows us to potentially process some outgoing messages and |
+ // interleave other work on this thread when we're getting hammered with |
+ // input messages. Potentially, this could be tuned to be more efficient |
+ // with some testing. |
+ input_state_.is_pending = true; |
+ return READ_PENDING; |
+} |
+ |
+bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) { |
+ return true; |
+} |
+ |
+void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) { |
+ // The hello message contains one parameter containing the PID. |
+ listener_->OnChannelConnected(MessageIterator(msg).NextInt()); |
+} |
+ |
+bool Channel::ChannelImpl::DidEmptyInputBuffers() { |
+ return true; |
+} |
+ |
+bool Channel::ChannelImpl::DispatchInputData(const char* input_data, |
+ int input_data_len) { |
+ const char* p; |
+ const char* end; |
+ |
+ // Possibly combine with the overflow buffer to make a larger buffer. |
+ if (input_overflow_buf_.empty()) { |
+ p = input_data; |
+ end = input_data + input_data_len; |
+ } else { |
+ if (input_overflow_buf_.size() > |
+ kMaximumMessageSize - input_data_len) { |
+ input_overflow_buf_.clear(); |
+ LOG(ERROR) << "IPC message is too big"; |
+ return false; |
+ } |
+ input_overflow_buf_.append(input_data, input_data_len); |
+ p = input_overflow_buf_.data(); |
+ end = p + input_overflow_buf_.size(); |
+ } |
+ |
+ // Dispatch all complete messages in the data buffer. |
+ while (p < end) { |
+ const char* message_tail = Message::FindNext(p, end); |
+ if (message_tail) { |
+ int len = static_cast<int>(message_tail - p); |
+ Message m(p, len); |
+ if (!WillDispatchInputMessage(&m)) |
+ return false; |
+ |
+ if (IsHelloMessage(m)) |
+ HandleHelloMessage(m); |
+ else |
+ listener_->OnMessageReceived(m); |
+ p = message_tail; |
+ } else { |
+ // Last message is partial. |
+ break; |
+ } |
+ } |
+ |
+ // Save any partial data in the overflow buffer. |
+ input_overflow_buf_.assign(p, end - p); |
+ |
+ if (input_overflow_buf_.empty() && !DidEmptyInputBuffers()) |
+ return false; |
+ return true; |
+} |
+ |
+bool Channel::ChannelImpl::IsHelloMessage(const Message& m) const { |
+ return m.routing_id() == MSG_ROUTING_NONE && m.type() == HELLO_MESSAGE_TYPE; |
+} |
+ |
+bool Channel::ChannelImpl::AsyncReadComplete(int bytes_read) { |
+ return DispatchInputData(input_buf_, bytes_read); |
+} |
+ |
// static |
const std::wstring Channel::ChannelImpl::PipeName( |
const std::string& channel_id) { |
@@ -257,89 +360,19 @@ bool Channel::ChannelImpl::ProcessConnection() { |
return true; |
} |
-bool Channel::ChannelImpl::ProcessIncomingMessages( |
- MessageLoopForIO::IOContext* context, |
- DWORD bytes_read) { |
- DCHECK(thread_check_->CalledOnValidThread()); |
- if (input_state_.is_pending) { |
- input_state_.is_pending = false; |
- DCHECK(context); |
- |
- if (!context || !bytes_read) |
+bool Channel::ChannelImpl::ProcessIncomingMessages() { |
+ while (true) { |
+ int bytes_read = 0; |
+ ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, |
+ &bytes_read); |
+ if (read_state == READ_FAILED) |
return false; |
- } else { |
- // This happens at channel initialization. |
- DCHECK(!bytes_read && context == &input_state_.context); |
- } |
- |
- for (;;) { |
- if (bytes_read == 0) { |
- if (INVALID_HANDLE_VALUE == pipe_) |
- return false; |
- |
- // Read from pipe... |
- BOOL ok = ReadFile(pipe_, |
- input_buf_, |
- Channel::kReadBufferSize, |
- &bytes_read, |
- &input_state_.context.overlapped); |
- if (!ok) { |
- DWORD err = GetLastError(); |
- if (err == ERROR_IO_PENDING) { |
- input_state_.is_pending = true; |
- return true; |
- } |
- LOG(ERROR) << "pipe error: " << err; |
- return false; |
- } |
- input_state_.is_pending = true; |
+ if (read_state == READ_PENDING) |
return true; |
- } |
- DCHECK(bytes_read); |
- |
- // Process messages from input buffer. |
- |
- const char* p, *end; |
- if (input_overflow_buf_.empty()) { |
- p = input_buf_; |
- end = p + bytes_read; |
- } else { |
- if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) { |
- input_overflow_buf_.clear(); |
- LOG(ERROR) << "IPC message is too big"; |
- return false; |
- } |
- input_overflow_buf_.append(input_buf_, bytes_read); |
- p = input_overflow_buf_.data(); |
- end = p + input_overflow_buf_.size(); |
- } |
- |
- while (p < end) { |
- const char* message_tail = Message::FindNext(p, end); |
- if (message_tail) { |
- int len = static_cast<int>(message_tail - p); |
- const Message m(p, len); |
- DVLOG(2) << "received message on channel @" << this |
- << " with type " << m.type(); |
- if (m.routing_id() == MSG_ROUTING_NONE && |
- m.type() == HELLO_MESSAGE_TYPE) { |
- // The Hello message contains only the process id. |
- listener_->OnChannelConnected(MessageIterator(m).NextInt()); |
- } else { |
- listener_->OnMessageReceived(m); |
- } |
- p = message_tail; |
- } else { |
- // Last message is partial. |
- break; |
- } |
- } |
- input_overflow_buf_.assign(p, end - p); |
- |
- bytes_read = 0; // Get more data. |
+ DCHECK(bytes_read > 0); |
+ if (!DispatchInputData(input_buf_, bytes_read)) |
+ return false; |
} |
- |
- return true; |
} |
bool Channel::ChannelImpl::ProcessOutgoingMessages( |
@@ -400,8 +433,9 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages( |
} |
void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context, |
- DWORD bytes_transfered, DWORD error) { |
- bool ok; |
+ DWORD bytes_transfered, |
+ DWORD error) { |
+ bool ok = true; |
DCHECK(thread_check_->CalledOnValidThread()); |
if (context == &input_state_.context) { |
if (waiting_connect_) { |
@@ -414,10 +448,26 @@ void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context, |
return; |
// else, fall-through and look for incoming messages... |
} |
- // we don't support recursion through OnMessageReceived yet! |
+ |
+ // We don't support recursion through OnMessageReceived yet! |
DCHECK(!processing_incoming_); |
AutoReset<bool> auto_reset_processing_incoming(&processing_incoming_, true); |
- ok = ProcessIncomingMessages(context, bytes_transfered); |
+ |
+ // Process the new data. |
+ if (input_state_.is_pending) { |
+ // This is the normal case for everything except the initialization step. |
+ input_state_.is_pending = false; |
+ if (!bytes_transfered) |
+ ok = false; |
+ else |
+ ok = AsyncReadComplete(bytes_transfered); |
+ } else { |
+ DCHECK(!bytes_transfered); |
+ } |
+ |
+ // Request more data. |
+ if (ok) |
+ ok = ProcessIncomingMessages(); |
} else { |
DCHECK(context == &output_state_.context); |
ok = ProcessOutgoingMessages(context, bytes_transfered); |