Index: remoting/protocol/message_reader.cc |
diff --git a/remoting/protocol/message_reader.cc b/remoting/protocol/message_reader.cc |
index 7b818ee7da90a0e61059cbf92aa5a59d2f67243e..fd06eac9025401dfaff264e4895a47f066492a12 100644 |
--- a/remoting/protocol/message_reader.cc |
+++ b/remoting/protocol/message_reader.cc |
@@ -16,14 +16,64 @@ namespace protocol { |
static const int kReadBufferSize = 4096; |
+class MessageReader::DoneTaskHandler |
+ : public base::RefCountedThreadSafe<MessageReader::DoneTaskHandler> { |
+ public: |
+ DoneTaskHandler(MessageReader* message_reader) |
+ : message_loop_(MessageLoop::current()), |
+ message_reader_(message_reader) { |
+ } |
+ |
+ ~DoneTaskHandler() { } |
+ |
+ void ReleaseReader() { |
+ DCHECK_EQ(message_loop_, MessageLoop::current()); |
+ AutoLock auto_lock(lock_); |
+ message_reader_ = NULL; |
+ } |
+ |
+ void OnDone(CompoundBuffer* buffer) { |
awong
2011/01/20 20:06:38
2 thoughts:
1) I wonder if we shouldn't delete t
Sergey Ulanov
2011/01/20 21:55:57
It would be great if we could use WeakPtr here, bu
awong
2011/01/20 22:08:02
Good point. Though I think if you do the post bac
Sergey Ulanov
2011/01/21 03:45:05
Destructor may be called on a different thread, an
awong
2011/01/21 19:19:33
Something about this still does not feel right. I
|
+ delete buffer; |
+ ProcessDoneEvent(); |
+ } |
+ |
+ private: |
+ void ProcessDoneEvent() { |
+ if (MessageLoop::current() != message_loop_) { |
+ message_loop_->PostTask(FROM_HERE, NewRunnableMethod( |
+ this, &DoneTaskHandler::ProcessDoneEvent)); |
+ return; |
+ } |
+ |
+ AutoLock auto_lock(lock_); |
+ if (message_reader_) { |
+ message_reader_->OnMessageDone(); |
+ } |
+ } |
+ |
+ // Network message loop on which this object is created and which is |
+ // used to call MessageReader::OnMessageDone(). |
+ MessageLoop* message_loop_; |
+ |
+ // MessageReader that owns this object. |
+ MessageReader* message_reader_; |
+ |
+ // Must be locked whenever |message_reader_| is accessed |
+ // (ReleaseReader() may be called on any thread). |
+ Lock lock_; |
+}; |
+ |
MessageReader::MessageReader() |
: socket_(NULL), |
+ read_pending_(false), |
+ pending_messages_(0), |
closed_(false), |
ALLOW_THIS_IN_INITIALIZER_LIST( |
read_callback_(this, &MessageReader::OnRead)) { |
} |
MessageReader::~MessageReader() { |
+ done_task_handler_->ReleaseReader(); |
} |
void MessageReader::Init(net::Socket* socket, |
@@ -31,21 +81,25 @@ void MessageReader::Init(net::Socket* socket, |
message_received_callback_.reset(callback); |
DCHECK(socket); |
socket_ = socket; |
+ done_task_handler_ = new DoneTaskHandler(this); |
DoRead(); |
} |
void MessageReader::DoRead() { |
- while (!closed_) { |
+ // Don't try to read again if there is another read pending or we |
+ // have messages that we haven't finished processing yet. |
+ while (!closed_ && !read_pending_ && pending_messages_ == 0) { |
Alpha Left Google
2011/01/20 20:54:52
The flag of |read_pending| is confusing to me. Acc
Sergey Ulanov
2011/01/20 21:55:57
It is here mainly for the second iteration, after
|
read_buffer_ = new net::IOBuffer(kReadBufferSize); |
int result = socket_->Read( |
read_buffer_, kReadBufferSize, &read_callback_); |
HandleReadResult(result); |
- if (result < 0) |
- break; |
} |
} |
void MessageReader::OnRead(int result) { |
+ DCHECK(read_pending_); |
+ read_pending_ = false; |
+ |
if (!closed_) { |
HandleReadResult(result); |
DoRead(); |
@@ -58,7 +112,9 @@ void MessageReader::HandleReadResult(int result) { |
} else { |
if (result == net::ERR_CONNECTION_CLOSED) { |
closed_ = true; |
- } else if (result != net::ERR_IO_PENDING) { |
+ } else if (result == net::ERR_IO_PENDING) { |
+ read_pending_ = true; |
+ } else { |
LOG(ERROR) << "Read() returned error " << result; |
} |
} |
@@ -68,13 +124,22 @@ void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) { |
message_decoder_.AddData(data, data_size); |
while (true) { |
- CompoundBuffer buffer; |
- if (!message_decoder_.GetNextMessage(&buffer)) |
+ CompoundBuffer* buffer = message_decoder_.GetNextMessage(); |
+ if (!buffer) |
break; |
- message_received_callback_->Run(&buffer); |
+ pending_messages_++; |
Alpha Left Google
2011/01/20 20:54:52
I think there's a bug here. Incrementing here will
Sergey Ulanov
2011/01/20 21:55:57
Good catch! Will also add unittest for this case.
|
+ message_received_callback_->Run(buffer, NewRunnableMethod( |
+ done_task_handler_.get(), &DoneTaskHandler::OnDone, buffer)); |
} |
} |
+void MessageReader::OnMessageDone() { |
+ pending_messages_--; |
+ DCHECK_GE(pending_messages_, 0); |
+ |
+ DoRead(); // Start next read if neccessary. |
+} |
+ |
} // namespace protocol |
} // namespace remoting |