Index: remoting/protocol/message_reader.cc |
diff --git a/remoting/protocol/message_reader.cc b/remoting/protocol/message_reader.cc |
index 7b818ee7da90a0e61059cbf92aa5a59d2f67243e..3bbfd59e3c6aa6dccac72dcf11b04c069a5cd07a 100644 |
--- a/remoting/protocol/message_reader.cc |
+++ b/remoting/protocol/message_reader.cc |
@@ -18,12 +18,16 @@ static const int kReadBufferSize = 4096; |
MessageReader::MessageReader() |
: socket_(NULL), |
+ message_loop_(NULL), |
+ read_pending_(false), |
+ pending_messages_(0), |
closed_(false), |
ALLOW_THIS_IN_INITIALIZER_LIST( |
read_callback_(this, &MessageReader::OnRead)) { |
} |
MessageReader::~MessageReader() { |
+ CHECK_EQ(pending_messages_, 0); |
} |
void MessageReader::Init(net::Socket* socket, |
@@ -31,21 +35,27 @@ void MessageReader::Init(net::Socket* socket, |
message_received_callback_.reset(callback); |
DCHECK(socket); |
socket_ = socket; |
+ message_loop_ = MessageLoop::current(); |
DoRead(); |
} |
void MessageReader::DoRead() { |
- while (!closed_) { |
+ DCHECK(!read_pending_); |
+ |
+ // Don't try to read again if there is another read pending or we |
+ // have messages that we haven't finished processing yet. |
+ while (!closed_ && !read_pending_ && pending_messages_ == 0) { |
read_buffer_ = new net::IOBuffer(kReadBufferSize); |
int result = socket_->Read( |
read_buffer_, kReadBufferSize, &read_callback_); |
HandleReadResult(result); |
- if (result < 0) |
- break; |
} |
} |
void MessageReader::OnRead(int result) { |
+ DCHECK(read_pending_); |
+ read_pending_ = false; |
+ |
if (!closed_) { |
HandleReadResult(result); |
DoRead(); |
@@ -53,12 +63,17 @@ void MessageReader::OnRead(int result) { |
} |
void MessageReader::HandleReadResult(int result) { |
+ if (closed_) |
+ return; |
+ |
if (result > 0) { |
OnDataReceived(read_buffer_, result); |
} else { |
if (result == net::ERR_CONNECTION_CLOSED) { |
closed_ = true; |
- } else if (result != net::ERR_IO_PENDING) { |
+ } else if (result == net::ERR_IO_PENDING) { |
+ read_pending_ = true; |
+ } else { |
LOG(ERROR) << "Read() returned error " << result; |
} |
} |
@@ -67,14 +82,42 @@ void MessageReader::HandleReadResult(int result) { |
void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) { |
message_decoder_.AddData(data, data_size); |
+ // Get list of all new messages first, and then call the callback |
+ // for all of them. |
+ std::vector<CompoundBuffer*> new_messages; |
while (true) { |
- CompoundBuffer buffer; |
- if (!message_decoder_.GetNextMessage(&buffer)) |
+ CompoundBuffer* buffer = message_decoder_.GetNextMessage(); |
+ if (!buffer) |
break; |
+ new_messages.push_back(buffer); |
+ } |
+ |
+ pending_messages_ += new_messages.size(); |
- message_received_callback_->Run(&buffer); |
+ for (std::vector<CompoundBuffer*>::iterator it = new_messages.begin(); |
+ it != new_messages.end(); ++it) { |
+ message_received_callback_->Run(*it, NewRunnableMethod( |
+ this, &MessageReader::OnMessageDone, *it)); |
} |
} |
+void MessageReader::OnMessageDone(CompoundBuffer* message) { |
+ delete message; |
+ ProcessDoneEvent(); |
+} |
+ |
+void MessageReader::ProcessDoneEvent() { |
+ if (MessageLoop::current() != message_loop_) { |
+ message_loop_->PostTask(FROM_HERE, NewRunnableMethod( |
+ this, &MessageReader::ProcessDoneEvent)); |
+ return; |
+ } |
+ |
+ pending_messages_--; |
+ DCHECK_GE(pending_messages_, 0); |
+ |
+ DoRead(); // Start next read if neccessary. |
+} |
+ |
} // namespace protocol |
} // namespace remoting |