| Index: remoting/protocol/message_reader.cc
|
| diff --git a/remoting/protocol/message_reader.cc b/remoting/protocol/message_reader.cc
|
| index 7b818ee7da90a0e61059cbf92aa5a59d2f67243e..fd06eac9025401dfaff264e4895a47f066492a12 100644
|
| --- a/remoting/protocol/message_reader.cc
|
| +++ b/remoting/protocol/message_reader.cc
|
| @@ -16,14 +16,64 @@ namespace protocol {
|
|
|
| static const int kReadBufferSize = 4096;
|
|
|
| +class MessageReader::DoneTaskHandler
|
| + : public base::RefCountedThreadSafe<MessageReader::DoneTaskHandler> {
|
| + public:
|
| + DoneTaskHandler(MessageReader* message_reader)
|
| + : message_loop_(MessageLoop::current()),
|
| + message_reader_(message_reader) {
|
| + }
|
| +
|
| + ~DoneTaskHandler() { }
|
| +
|
| + void ReleaseReader() {
|
| + DCHECK_EQ(message_loop_, MessageLoop::current());
|
| + AutoLock auto_lock(lock_);
|
| + message_reader_ = NULL;
|
| + }
|
| +
|
| + void OnDone(CompoundBuffer* buffer) {
|
| + delete buffer;
|
| + ProcessDoneEvent();
|
| + }
|
| +
|
| + private:
|
| + void ProcessDoneEvent() {
|
| + if (MessageLoop::current() != message_loop_) {
|
| + message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
|
| + this, &DoneTaskHandler::ProcessDoneEvent));
|
| + return;
|
| + }
|
| +
|
| + AutoLock auto_lock(lock_);
|
| + if (message_reader_) {
|
| + message_reader_->OnMessageDone();
|
| + }
|
| + }
|
| +
|
| + // Network message loop on which this object is created and which is
|
| + // used to call MessageReader::OnMessageDone().
|
| + MessageLoop* message_loop_;
|
| +
|
| + // MessageReader that owns this object.
|
| + MessageReader* message_reader_;
|
| +
|
| + // Must be locked whenever |message_reader_| is accessed
|
| + // (ReleaseReader() may be called on any thread).
|
| + Lock lock_;
|
| +};
|
| +
|
| MessageReader::MessageReader()
|
| : socket_(NULL),
|
| + read_pending_(false),
|
| + pending_messages_(0),
|
| closed_(false),
|
| ALLOW_THIS_IN_INITIALIZER_LIST(
|
| read_callback_(this, &MessageReader::OnRead)) {
|
| }
|
|
|
| MessageReader::~MessageReader() {
|
| + done_task_handler_->ReleaseReader();
|
| }
|
|
|
| void MessageReader::Init(net::Socket* socket,
|
| @@ -31,21 +81,25 @@ void MessageReader::Init(net::Socket* socket,
|
| message_received_callback_.reset(callback);
|
| DCHECK(socket);
|
| socket_ = socket;
|
| + done_task_handler_ = new DoneTaskHandler(this);
|
| DoRead();
|
| }
|
|
|
| void MessageReader::DoRead() {
|
| - while (!closed_) {
|
| + // Don't try to read again if there is another read pending or we
|
| + // have messages that we haven't finished processing yet.
|
| + while (!closed_ && !read_pending_ && pending_messages_ == 0) {
|
| read_buffer_ = new net::IOBuffer(kReadBufferSize);
|
| int result = socket_->Read(
|
| read_buffer_, kReadBufferSize, &read_callback_);
|
| HandleReadResult(result);
|
| - if (result < 0)
|
| - break;
|
| }
|
| }
|
|
|
| void MessageReader::OnRead(int result) {
|
| + DCHECK(read_pending_);
|
| + read_pending_ = false;
|
| +
|
| if (!closed_) {
|
| HandleReadResult(result);
|
| DoRead();
|
| @@ -58,7 +112,9 @@ void MessageReader::HandleReadResult(int result) {
|
| } else {
|
| if (result == net::ERR_CONNECTION_CLOSED) {
|
| closed_ = true;
|
| - } else if (result != net::ERR_IO_PENDING) {
|
| + } else if (result == net::ERR_IO_PENDING) {
|
| + read_pending_ = true;
|
| + } else {
|
| LOG(ERROR) << "Read() returned error " << result;
|
| }
|
| }
|
| @@ -68,13 +124,22 @@ void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) {
|
| message_decoder_.AddData(data, data_size);
|
|
|
| while (true) {
|
| - CompoundBuffer buffer;
|
| - if (!message_decoder_.GetNextMessage(&buffer))
|
| + CompoundBuffer* buffer = message_decoder_.GetNextMessage();
|
| + if (!buffer)
|
| break;
|
|
|
| - message_received_callback_->Run(&buffer);
|
| + pending_messages_++;
|
| + message_received_callback_->Run(buffer, NewRunnableMethod(
|
| + done_task_handler_.get(), &DoneTaskHandler::OnDone, buffer));
|
| }
|
| }
|
|
|
| +void MessageReader::OnMessageDone() {
|
| + pending_messages_--;
|
| + DCHECK_GE(pending_messages_, 0);
|
| +
|
| + DoRead(); // Start next read if neccessary.
|
| +}
|
| +
|
| } // namespace protocol
|
| } // namespace remoting
|
|
|