Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(499)

Unified Diff: remoting/protocol/message_reader.h

Issue 6271004: Changed MessageReader so that it doesn't read from the socket if there are (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: ref-counted MessageReader Created 9 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « remoting/protocol/message_decoder_unittest.cc ('k') | remoting/protocol/message_reader.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: remoting/protocol/message_reader.h
diff --git a/remoting/protocol/message_reader.h b/remoting/protocol/message_reader.h
index d4937780590fd5bff33d29bfb203f354c9120c84..d14cb90b8a651f812454513faac1455f0a3e0319 100644
--- a/remoting/protocol/message_reader.h
+++ b/remoting/protocol/message_reader.h
@@ -13,6 +13,8 @@
#include "remoting/base/compound_buffer.h"
#include "remoting/protocol/message_decoder.h"
+class MessageLoop;
+
namespace net {
class IOBuffer;
class Socket;
@@ -22,10 +24,23 @@ namespace remoting {
namespace protocol {
// MessageReader reads data from the socket asynchronously and calls
-// callback for each message it receives
-class MessageReader {
+// callback for each message it receives. It stops calling the
+// callback as soon as the socket is closed, so the socket should
+// always be closed before the callback handler is destroyed.
+//
+// In order to throttle the stream, MessageReader doesn't try to read
+// new data from the socket until all previously received messages are
+// processed by the receiver (|done_task| is called for each message).
+// It is still possible that the MessageReceivedCallback is called
+// twice (so that there is more than one outstanding message),
+// e.g. when we the sender sends multiple messages in one TCP packet.
+class MessageReader : public base::RefCountedThreadSafe<MessageReader> {
public:
- typedef Callback1<CompoundBuffer*>::Type MessageReceivedCallback;
+ // The callback is given ownership of the second argument
+ // (|done_task|). The buffer (first argument) is owned by
+ // MessageReader and is freed when the task specified by the second
+ // argument is called.
+ typedef Callback2<CompoundBuffer*, Task*>::Type MessageReceivedCallback;
MessageReader();
virtual ~MessageReader();
@@ -39,9 +54,23 @@ class MessageReader {
void OnRead(int result);
void HandleReadResult(int result);
void OnDataReceived(net::IOBuffer* data, int data_size);
+ void OnMessageDone(CompoundBuffer* message);
+ void ProcessDoneEvent();
net::Socket* socket_;
+ // The network message loop this object runs on.
+ MessageLoop* message_loop_;
+
+ // Set to true, when we have a socket read pending, and expecting
+ // OnRead() to be called when new data is received.
+ bool read_pending_;
+
+ // Number of messages that we received, but haven't finished
+ // processing yet, i.e. |done_task| hasn't been called for these
+ // messages.
+ int pending_messages_;
+
bool closed_;
scoped_refptr<net::IOBuffer> read_buffer_;
net::CompletionCallbackImpl<MessageReader> read_callback_;
@@ -52,33 +81,46 @@ class MessageReader {
scoped_ptr<MessageReceivedCallback> message_received_callback_;
};
+// Version of MessageReader for protocol buffer messages, that parses
+// each incoming message.
template <class T>
class ProtobufMessageReader {
public:
- typedef typename Callback1<T*>::Type MessageReceivedCallback;
+ typedef typename Callback2<T*, Task*>::Type MessageReceivedCallback;
ProtobufMessageReader() { };
~ProtobufMessageReader() { };
void Init(net::Socket* socket, MessageReceivedCallback* callback) {
message_received_callback_.reset(callback);
- message_reader_.Init(
+ message_reader_ = new MessageReader();
+ message_reader_->Init(
socket, NewCallback(this, &ProtobufMessageReader<T>::OnNewData));
}
private:
- void OnNewData(CompoundBuffer* buffer) {
+ void OnNewData(CompoundBuffer* buffer, Task* done_task) {
T* message = new T();
CompoundBufferInputStream stream(buffer);
bool ret = message->ParseFromZeroCopyStream(&stream);
if (!ret) {
+ LOG(WARNING) << "Received message that is not a valid protocol buffer.";
delete message;
} else {
- message_received_callback_->Run(message);
+ DCHECK_EQ(stream.position(), buffer->total_bytes());
+ message_received_callback_->Run(
+ message, NewRunnableFunction(
+ &ProtobufMessageReader<T>::OnDone, message, done_task));
}
}
- MessageReader message_reader_;
+ static void OnDone(T* message, Task* done_task) {
+ delete message;
+ done_task->Run();
+ delete done_task;
+ }
+
+ scoped_refptr<MessageReader> message_reader_;
scoped_ptr<MessageReceivedCallback> message_received_callback_;
};
« no previous file with comments | « remoting/protocol/message_decoder_unittest.cc ('k') | remoting/protocol/message_reader.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698