Chromium Code Reviews| Index: remoting/protocol/message_reader.cc |
| diff --git a/remoting/protocol/message_reader.cc b/remoting/protocol/message_reader.cc |
| index 7b818ee7da90a0e61059cbf92aa5a59d2f67243e..fd06eac9025401dfaff264e4895a47f066492a12 100644 |
| --- a/remoting/protocol/message_reader.cc |
| +++ b/remoting/protocol/message_reader.cc |
| @@ -16,14 +16,64 @@ namespace protocol { |
| static const int kReadBufferSize = 4096; |
| +class MessageReader::DoneTaskHandler |
| + : public base::RefCountedThreadSafe<MessageReader::DoneTaskHandler> { |
| + public: |
| + DoneTaskHandler(MessageReader* message_reader) |
| + : message_loop_(MessageLoop::current()), |
| + message_reader_(message_reader) { |
| + } |
| + |
| + ~DoneTaskHandler() { } |
| + |
| + void ReleaseReader() { |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + AutoLock auto_lock(lock_); |
| + message_reader_ = NULL; |
| + } |
| + |
| + void OnDone(CompoundBuffer* buffer) { |
|
awong
2011/01/20 20:06:38
2 thoughts:
1) I wonder if we shouldn't delete t
Sergey Ulanov
2011/01/20 21:55:57
It would be great if we could use WeakPtr here, bu
awong
2011/01/20 22:08:02
Good point. Though I think if you do the post bac
Sergey Ulanov
2011/01/21 03:45:05
Destructor may be called on a different thread, an
awong
2011/01/21 19:19:33
Something about this still does not feel right. I
|
| + delete buffer; |
| + ProcessDoneEvent(); |
| + } |
| + |
| + private: |
| + void ProcessDoneEvent() { |
| + if (MessageLoop::current() != message_loop_) { |
| + message_loop_->PostTask(FROM_HERE, NewRunnableMethod( |
| + this, &DoneTaskHandler::ProcessDoneEvent)); |
| + return; |
| + } |
| + |
| + AutoLock auto_lock(lock_); |
| + if (message_reader_) { |
| + message_reader_->OnMessageDone(); |
| + } |
| + } |
| + |
| + // Network message loop on which this object is created and which is |
| + // used to call MessageReader::OnMessageDone(). |
| + MessageLoop* message_loop_; |
| + |
| + // MessageReader that owns this object. |
| + MessageReader* message_reader_; |
| + |
| + // Must be locked whenever |message_reader_| is accessed |
| + // (ReleaseReader() may be called on any thread). |
| + Lock lock_; |
| +}; |
| + |
| MessageReader::MessageReader() |
| : socket_(NULL), |
| + read_pending_(false), |
| + pending_messages_(0), |
| closed_(false), |
| ALLOW_THIS_IN_INITIALIZER_LIST( |
| read_callback_(this, &MessageReader::OnRead)) { |
| } |
| MessageReader::~MessageReader() { |
| + done_task_handler_->ReleaseReader(); |
| } |
| void MessageReader::Init(net::Socket* socket, |
| @@ -31,21 +81,25 @@ void MessageReader::Init(net::Socket* socket, |
| message_received_callback_.reset(callback); |
| DCHECK(socket); |
| socket_ = socket; |
| + done_task_handler_ = new DoneTaskHandler(this); |
| DoRead(); |
| } |
| void MessageReader::DoRead() { |
| - while (!closed_) { |
| + // Don't try to read again if there is another read pending or we |
| + // have messages that we haven't finished processing yet. |
| + while (!closed_ && !read_pending_ && pending_messages_ == 0) { |
|
Alpha Left Google
2011/01/20 20:54:52
The flag of |read_pending| is confusing to me. Acc
Sergey Ulanov
2011/01/20 21:55:57
It is here mainly for the second iteration, after
|
| read_buffer_ = new net::IOBuffer(kReadBufferSize); |
| int result = socket_->Read( |
| read_buffer_, kReadBufferSize, &read_callback_); |
| HandleReadResult(result); |
| - if (result < 0) |
| - break; |
| } |
| } |
| void MessageReader::OnRead(int result) { |
| + DCHECK(read_pending_); |
| + read_pending_ = false; |
| + |
| if (!closed_) { |
| HandleReadResult(result); |
| DoRead(); |
| @@ -58,7 +112,9 @@ void MessageReader::HandleReadResult(int result) { |
| } else { |
| if (result == net::ERR_CONNECTION_CLOSED) { |
| closed_ = true; |
| - } else if (result != net::ERR_IO_PENDING) { |
| + } else if (result == net::ERR_IO_PENDING) { |
| + read_pending_ = true; |
| + } else { |
| LOG(ERROR) << "Read() returned error " << result; |
| } |
| } |
| @@ -68,13 +124,22 @@ void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) { |
| message_decoder_.AddData(data, data_size); |
| while (true) { |
| - CompoundBuffer buffer; |
| - if (!message_decoder_.GetNextMessage(&buffer)) |
| + CompoundBuffer* buffer = message_decoder_.GetNextMessage(); |
| + if (!buffer) |
| break; |
| - message_received_callback_->Run(&buffer); |
| + pending_messages_++; |
|
Alpha Left Google
2011/01/20 20:54:52
I think there's a bug here. Incrementing here will
Sergey Ulanov
2011/01/20 21:55:57
Good catch! Will also add unittest for this case.
|
| + message_received_callback_->Run(buffer, NewRunnableMethod( |
| + done_task_handler_.get(), &DoneTaskHandler::OnDone, buffer)); |
| } |
| } |
| +void MessageReader::OnMessageDone() { |
| + pending_messages_--; |
| + DCHECK_GE(pending_messages_, 0); |
| + |
| + DoRead(); // Start next read if neccessary. |
| +} |
| + |
| } // namespace protocol |
| } // namespace remoting |