Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(123)

Unified Diff: ipc/ipc_channel_win.cc

Issue 9568031: Refactoring on Windows IPC channel. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « ipc/ipc_channel_win.h ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: ipc/ipc_channel_win.cc
diff --git a/ipc/ipc_channel_win.cc b/ipc/ipc_channel_win.cc
index 7b4f174c65ca74074e7e7b8b0b7abe65484b38d0..7e441b65937b17b6dda8d5f839a2c9b19057ed3c 100644
--- a/ipc/ipc_channel_win.cc
+++ b/ipc/ipc_channel_win.cc
@@ -104,6 +104,109 @@ bool Channel::ChannelImpl::IsNamedServerInitialized(
return GetLastError() == ERROR_SEM_TIMEOUT;
}
+
+Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData(
+ char* buffer,
+ int buffer_len,
+ int* /* bytes_read */) {
+ if (INVALID_HANDLE_VALUE == pipe_)
+ return READ_FAILED;
+
+ DWORD bytes_read = 0;
+ BOOL ok = ReadFile(pipe_, input_buf_, Channel::kReadBufferSize,
+ &bytes_read, &input_state_.context.overlapped);
+ if (!ok) {
+ DWORD err = GetLastError();
+ if (err == ERROR_IO_PENDING) {
+ input_state_.is_pending = true;
+ return READ_PENDING;
+ }
+ LOG(ERROR) << "pipe error: " << err;
+ return READ_FAILED;
+ }
+
+ // We could return READ_SUCCEEDED here. But the way that this code is
+ // structured we instead go back to the message loop. Our completion port
+ // will be signalled even in the "synchronously completed" state.
+ //
+ // This allows us to potentially process some outgoing messages and
+ // interleave other work on this thread when we're getting hammered with
+ // input messages. Potentially, this could be tuned to be more efficient
+ // with some testing.
+ input_state_.is_pending = true;
+ return READ_PENDING;
+}
+
+bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) {
+ return true;
+}
+
+void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) {
+ // The hello message contains one parameter containing the PID.
+ listener_->OnChannelConnected(MessageIterator(msg).NextInt());
+}
+
+bool Channel::ChannelImpl::DidEmptyInputBuffers() {
+ return true;
+}
+
+bool Channel::ChannelImpl::DispatchInputData(const char* input_data,
+ int input_data_len) {
+ const char* p;
+ const char* end;
+
+ // Possibly combine with the overflow buffer to make a larger buffer.
+ if (input_overflow_buf_.empty()) {
+ p = input_data;
+ end = input_data + input_data_len;
+ } else {
+ if (input_overflow_buf_.size() >
+ kMaximumMessageSize - input_data_len) {
+ input_overflow_buf_.clear();
+ LOG(ERROR) << "IPC message is too big";
+ return false;
+ }
+ input_overflow_buf_.append(input_data, input_data_len);
+ p = input_overflow_buf_.data();
+ end = p + input_overflow_buf_.size();
+ }
+
+ // Dispatch all complete messages in the data buffer.
+ while (p < end) {
+ const char* message_tail = Message::FindNext(p, end);
+ if (message_tail) {
+ int len = static_cast<int>(message_tail - p);
+ Message m(p, len);
+ if (!WillDispatchInputMessage(&m))
+ return false;
+
+ if (IsHelloMessage(m))
+ HandleHelloMessage(m);
+ else
+ listener_->OnMessageReceived(m);
+ p = message_tail;
+ } else {
+ // Last message is partial.
+ break;
+ }
+ }
+
+ // Save any partial data in the overflow buffer.
+ input_overflow_buf_.assign(p, end - p);
+
+ if (input_overflow_buf_.empty() && !DidEmptyInputBuffers())
+ return false;
+ return true;
+}
+
+bool Channel::ChannelImpl::IsHelloMessage(const Message& m) const {
+ return m.routing_id() == MSG_ROUTING_NONE && m.type() == HELLO_MESSAGE_TYPE;
+}
+
+bool Channel::ChannelImpl::AsyncReadComplete(int bytes_read) {
+ return DispatchInputData(input_buf_, bytes_read);
+}
+
// static
const std::wstring Channel::ChannelImpl::PipeName(
const std::string& channel_id) {
@@ -257,89 +360,19 @@ bool Channel::ChannelImpl::ProcessConnection() {
return true;
}
-bool Channel::ChannelImpl::ProcessIncomingMessages(
- MessageLoopForIO::IOContext* context,
- DWORD bytes_read) {
- DCHECK(thread_check_->CalledOnValidThread());
- if (input_state_.is_pending) {
- input_state_.is_pending = false;
- DCHECK(context);
-
- if (!context || !bytes_read)
+bool Channel::ChannelImpl::ProcessIncomingMessages() {
+ while (true) {
+ int bytes_read = 0;
+ ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize,
+ &bytes_read);
+ if (read_state == READ_FAILED)
return false;
- } else {
- // This happens at channel initialization.
- DCHECK(!bytes_read && context == &input_state_.context);
- }
-
- for (;;) {
- if (bytes_read == 0) {
- if (INVALID_HANDLE_VALUE == pipe_)
- return false;
-
- // Read from pipe...
- BOOL ok = ReadFile(pipe_,
- input_buf_,
- Channel::kReadBufferSize,
- &bytes_read,
- &input_state_.context.overlapped);
- if (!ok) {
- DWORD err = GetLastError();
- if (err == ERROR_IO_PENDING) {
- input_state_.is_pending = true;
- return true;
- }
- LOG(ERROR) << "pipe error: " << err;
- return false;
- }
- input_state_.is_pending = true;
+ if (read_state == READ_PENDING)
return true;
- }
- DCHECK(bytes_read);
-
- // Process messages from input buffer.
-
- const char* p, *end;
- if (input_overflow_buf_.empty()) {
- p = input_buf_;
- end = p + bytes_read;
- } else {
- if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) {
- input_overflow_buf_.clear();
- LOG(ERROR) << "IPC message is too big";
- return false;
- }
- input_overflow_buf_.append(input_buf_, bytes_read);
- p = input_overflow_buf_.data();
- end = p + input_overflow_buf_.size();
- }
-
- while (p < end) {
- const char* message_tail = Message::FindNext(p, end);
- if (message_tail) {
- int len = static_cast<int>(message_tail - p);
- const Message m(p, len);
- DVLOG(2) << "received message on channel @" << this
- << " with type " << m.type();
- if (m.routing_id() == MSG_ROUTING_NONE &&
- m.type() == HELLO_MESSAGE_TYPE) {
- // The Hello message contains only the process id.
- listener_->OnChannelConnected(MessageIterator(m).NextInt());
- } else {
- listener_->OnMessageReceived(m);
- }
- p = message_tail;
- } else {
- // Last message is partial.
- break;
- }
- }
- input_overflow_buf_.assign(p, end - p);
-
- bytes_read = 0; // Get more data.
+ DCHECK(bytes_read > 0);
+ if (!DispatchInputData(input_buf_, bytes_read))
+ return false;
}
-
- return true;
}
bool Channel::ChannelImpl::ProcessOutgoingMessages(
@@ -400,8 +433,9 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages(
}
void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context,
- DWORD bytes_transfered, DWORD error) {
- bool ok;
+ DWORD bytes_transfered,
+ DWORD error) {
+ bool ok = true;
DCHECK(thread_check_->CalledOnValidThread());
if (context == &input_state_.context) {
if (waiting_connect_) {
@@ -414,10 +448,26 @@ void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context,
return;
// else, fall-through and look for incoming messages...
}
- // we don't support recursion through OnMessageReceived yet!
+
+ // We don't support recursion through OnMessageReceived yet!
DCHECK(!processing_incoming_);
AutoReset<bool> auto_reset_processing_incoming(&processing_incoming_, true);
- ok = ProcessIncomingMessages(context, bytes_transfered);
+
+ // Process the new data.
+ if (input_state_.is_pending) {
+ // This is the normal case for everything except the initialization step.
+ input_state_.is_pending = false;
+ if (!bytes_transfered)
+ ok = false;
+ else
+ ok = AsyncReadComplete(bytes_transfered);
+ } else {
+ DCHECK(!bytes_transfered);
+ }
+
+ // Request more data.
+ if (ok)
+ ok = ProcessIncomingMessages();
} else {
DCHECK(context == &output_state_.context);
ok = ProcessOutgoingMessages(context, bytes_transfered);
« no previous file with comments | « ipc/ipc_channel_win.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698