Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1021)

Unified Diff: remoting/protocol/message_reader.cc

Issue 6271004: Changed MessageReader so that it doesn't read from the socket if there are (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: proper handling of empty messages Created 9 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: remoting/protocol/message_reader.cc
diff --git a/remoting/protocol/message_reader.cc b/remoting/protocol/message_reader.cc
index 7b818ee7da90a0e61059cbf92aa5a59d2f67243e..fd06eac9025401dfaff264e4895a47f066492a12 100644
--- a/remoting/protocol/message_reader.cc
+++ b/remoting/protocol/message_reader.cc
@@ -16,14 +16,64 @@ namespace protocol {
static const int kReadBufferSize = 4096;
+class MessageReader::DoneTaskHandler
+ : public base::RefCountedThreadSafe<MessageReader::DoneTaskHandler> {
+ public:
+ DoneTaskHandler(MessageReader* message_reader)
+ : message_loop_(MessageLoop::current()),
+ message_reader_(message_reader) {
+ }
+
+ ~DoneTaskHandler() { }
+
+ void ReleaseReader() {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ AutoLock auto_lock(lock_);
+ message_reader_ = NULL;
+ }
+
+ void OnDone(CompoundBuffer* buffer) {
awong 2011/01/20 20:06:38 2 thoughts: 1) I wonder if we shouldn't delete t
Sergey Ulanov 2011/01/20 21:55:57 It would be great if we could use WeakPtr here, bu
awong 2011/01/20 22:08:02 Good point. Though I think if you do the post bac
Sergey Ulanov 2011/01/21 03:45:05 Destructor may be called on a different thread, an
awong 2011/01/21 19:19:33 Something about this still does not feel right. I
+ delete buffer;
+ ProcessDoneEvent();
+ }
+
+ private:
+ void ProcessDoneEvent() {
+ if (MessageLoop::current() != message_loop_) {
+ message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
+ this, &DoneTaskHandler::ProcessDoneEvent));
+ return;
+ }
+
+ AutoLock auto_lock(lock_);
+ if (message_reader_) {
+ message_reader_->OnMessageDone();
+ }
+ }
+
+ // Network message loop on which this object is created and which is
+ // used to call MessageReader::OnMessageDone().
+ MessageLoop* message_loop_;
+
+ // MessageReader that owns this object.
+ MessageReader* message_reader_;
+
+ // Must be locked whenever |message_reader_| is accessed
+ // (ReleaseReader() may be called on any thread).
+ Lock lock_;
+};
+
MessageReader::MessageReader()
: socket_(NULL),
+ read_pending_(false),
+ pending_messages_(0),
closed_(false),
ALLOW_THIS_IN_INITIALIZER_LIST(
read_callback_(this, &MessageReader::OnRead)) {
}
MessageReader::~MessageReader() {
+ done_task_handler_->ReleaseReader();
}
void MessageReader::Init(net::Socket* socket,
@@ -31,21 +81,25 @@ void MessageReader::Init(net::Socket* socket,
message_received_callback_.reset(callback);
DCHECK(socket);
socket_ = socket;
+ done_task_handler_ = new DoneTaskHandler(this);
DoRead();
}
void MessageReader::DoRead() {
- while (!closed_) {
+ // Don't try to read again if there is another read pending or we
+ // have messages that we haven't finished processing yet.
+ while (!closed_ && !read_pending_ && pending_messages_ == 0) {
Alpha Left Google 2011/01/20 20:54:52 The flag of |read_pending| is confusing to me. Acc
Sergey Ulanov 2011/01/20 21:55:57 It is here mainly for the second iteration, after
read_buffer_ = new net::IOBuffer(kReadBufferSize);
int result = socket_->Read(
read_buffer_, kReadBufferSize, &read_callback_);
HandleReadResult(result);
- if (result < 0)
- break;
}
}
void MessageReader::OnRead(int result) {
+ DCHECK(read_pending_);
+ read_pending_ = false;
+
if (!closed_) {
HandleReadResult(result);
DoRead();
@@ -58,7 +112,9 @@ void MessageReader::HandleReadResult(int result) {
} else {
if (result == net::ERR_CONNECTION_CLOSED) {
closed_ = true;
- } else if (result != net::ERR_IO_PENDING) {
+ } else if (result == net::ERR_IO_PENDING) {
+ read_pending_ = true;
+ } else {
LOG(ERROR) << "Read() returned error " << result;
}
}
@@ -68,13 +124,22 @@ void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) {
message_decoder_.AddData(data, data_size);
while (true) {
- CompoundBuffer buffer;
- if (!message_decoder_.GetNextMessage(&buffer))
+ CompoundBuffer* buffer = message_decoder_.GetNextMessage();
+ if (!buffer)
break;
- message_received_callback_->Run(&buffer);
+ pending_messages_++;
Alpha Left Google 2011/01/20 20:54:52 I think there's a bug here. Incrementing here will
Sergey Ulanov 2011/01/20 21:55:57 Good catch! Will also add unittest for this case.
+ message_received_callback_->Run(buffer, NewRunnableMethod(
+ done_task_handler_.get(), &DoneTaskHandler::OnDone, buffer));
}
}
+void MessageReader::OnMessageDone() {
+ pending_messages_--;
+ DCHECK_GE(pending_messages_, 0);
+
+ DoRead(); // Start next read if neccessary.
+}
+
} // namespace protocol
} // namespace remoting

Powered by Google App Engine
This is Rietveld 408576698