Index: remoting/protocol/message_reader.cc |
diff --git a/remoting/protocol/message_reader.cc b/remoting/protocol/message_reader.cc |
index 309f87dd060f577cfdf1dfbe5499d215babf6000..9dbefdfdf8485793dccc235e443ba5c9465d295c 100644 |
--- a/remoting/protocol/message_reader.cc |
+++ b/remoting/protocol/message_reader.cc |
@@ -9,6 +9,7 @@ |
#include "base/compiler_specific.h" |
#include "base/location.h" |
#include "base/thread_task_runner_handle.h" |
+#include "base/single_thread_task_runner.h" |
#include "net/base/io_buffer.h" |
#include "net/base/net_errors.h" |
#include "net/socket/socket.h" |
@@ -34,13 +35,13 @@ void MessageReader::Init(net::Socket* socket, |
message_received_callback_ = callback; |
DCHECK(socket); |
socket_ = socket; |
- DoRead(); |
+ DoRead(false); |
} |
MessageReader::~MessageReader() { |
} |
-void MessageReader::DoRead() { |
+void MessageReader::DoRead(bool in_callback) { |
DCHECK(CalledOnValidThread()); |
// Don't try to read again if there is another read pending or we |
// have messages that we haven't finished processing yet. |
@@ -49,7 +50,7 @@ void MessageReader::DoRead() { |
int result = socket_->Read( |
read_buffer_, kReadBufferSize, |
base::Bind(&MessageReader::OnRead, weak_factory_.GetWeakPtr())); |
- HandleReadResult(result); |
+ HandleReadResult(result, in_callback); |
} |
} |
@@ -59,18 +60,18 @@ void MessageReader::OnRead(int result) { |
read_pending_ = false; |
if (!closed_) { |
- HandleReadResult(result); |
- DoRead(); |
+ HandleReadResult(result, false); |
+ DoRead(false); |
} |
} |
-void MessageReader::HandleReadResult(int result) { |
+void MessageReader::HandleReadResult(int result, bool in_callback) { |
DCHECK(CalledOnValidThread()); |
if (closed_) |
return; |
if (result > 0) { |
- OnDataReceived(read_buffer_, result); |
+ OnDataReceived(read_buffer_, result, in_callback); |
} else if (result == net::ERR_IO_PENDING) { |
read_pending_ = true; |
} else { |
@@ -82,7 +83,8 @@ void MessageReader::HandleReadResult(int result) { |
} |
} |
-void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) { |
+void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size, |
+ bool in_callback) { |
DCHECK(CalledOnValidThread()); |
message_decoder_.AddData(data, data_size); |
@@ -100,20 +102,32 @@ void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) { |
for (std::vector<CompoundBuffer*>::iterator it = new_messages.begin(); |
it != new_messages.end(); ++it) { |
- message_received_callback_.Run( |
- scoped_ptr<CompoundBuffer>(*it), |
- base::Bind(&MessageReader::OnMessageDone, |
- weak_factory_.GetWeakPtr())); |
+ scoped_ptr<CompoundBuffer> message(*it); |
+ if (in_callback) { |
+ base::ThreadTaskRunnerHandle::Get()->PostTask( |
+ FROM_HERE, base::Bind(&MessageReader::RunCallback, |
+ weak_factory_.GetWeakPtr(), |
+ base::Passed(&message))); |
+ } else { |
+ RunCallback(message.Pass()); |
+ } |
} |
} |
+void MessageReader::RunCallback(scoped_ptr<CompoundBuffer> message) { |
+ message_received_callback_.Run( |
+ message.Pass(), base::Bind(&MessageReader::OnMessageDone, |
+ weak_factory_.GetWeakPtr())); |
+} |
+ |
void MessageReader::OnMessageDone() { |
DCHECK(CalledOnValidThread()); |
pending_messages_--; |
DCHECK_GE(pending_messages_, 0); |
+ // Start next read if necessary. |
if (!read_pending_) |
Wez
2012/09/13 01:00:09
nit: DoRead() already checks |read_pending_| so yo
Sergey Ulanov
2012/09/13 01:40:21
Done.
|
- DoRead(); // Start next read if necessary. |
+ DoRead(true); |
Wez
2012/09/13 01:00:09
Wouldn't it be simpler to have a member that you s
Sergey Ulanov
2012/09/13 01:40:21
I actually noticed another bug in this code - it d
|
} |
} // namespace protocol |