Index: remoting/protocol/message_reader.h |
diff --git a/remoting/protocol/message_reader.h b/remoting/protocol/message_reader.h |
index d4937780590fd5bff33d29bfb203f354c9120c84..d14cb90b8a651f812454513faac1455f0a3e0319 100644 |
--- a/remoting/protocol/message_reader.h |
+++ b/remoting/protocol/message_reader.h |
@@ -13,6 +13,8 @@ |
#include "remoting/base/compound_buffer.h" |
#include "remoting/protocol/message_decoder.h" |
+class MessageLoop; |
+ |
namespace net { |
class IOBuffer; |
class Socket; |
@@ -22,10 +24,23 @@ namespace remoting { |
namespace protocol { |
// MessageReader reads data from the socket asynchronously and calls |
-// callback for each message it receives |
-class MessageReader { |
+// callback for each message it receives. It stops calling the |
+// callback as soon as the socket is closed, so the socket should |
+// always be closed before the callback handler is destroyed. |
+// |
+// In order to throttle the stream, MessageReader doesn't try to read |
+// new data from the socket until all previously received messages are |
+// processed by the receiver (|done_task| is called for each message). |
+// It is still possible that the MessageReceivedCallback is called |
+// twice (so that there is more than one outstanding message), |
+// e.g. when we the sender sends multiple messages in one TCP packet. |
+class MessageReader : public base::RefCountedThreadSafe<MessageReader> { |
public: |
- typedef Callback1<CompoundBuffer*>::Type MessageReceivedCallback; |
+ // The callback is given ownership of the second argument |
+ // (|done_task|). The buffer (first argument) is owned by |
+ // MessageReader and is freed when the task specified by the second |
+ // argument is called. |
+ typedef Callback2<CompoundBuffer*, Task*>::Type MessageReceivedCallback; |
MessageReader(); |
virtual ~MessageReader(); |
@@ -39,9 +54,23 @@ class MessageReader { |
void OnRead(int result); |
void HandleReadResult(int result); |
void OnDataReceived(net::IOBuffer* data, int data_size); |
+ void OnMessageDone(CompoundBuffer* message); |
+ void ProcessDoneEvent(); |
net::Socket* socket_; |
+ // The network message loop this object runs on. |
+ MessageLoop* message_loop_; |
+ |
+ // Set to true, when we have a socket read pending, and expecting |
+ // OnRead() to be called when new data is received. |
+ bool read_pending_; |
+ |
+ // Number of messages that we received, but haven't finished |
+ // processing yet, i.e. |done_task| hasn't been called for these |
+ // messages. |
+ int pending_messages_; |
+ |
bool closed_; |
scoped_refptr<net::IOBuffer> read_buffer_; |
net::CompletionCallbackImpl<MessageReader> read_callback_; |
@@ -52,33 +81,46 @@ class MessageReader { |
scoped_ptr<MessageReceivedCallback> message_received_callback_; |
}; |
+// Version of MessageReader for protocol buffer messages, that parses |
+// each incoming message. |
template <class T> |
class ProtobufMessageReader { |
public: |
- typedef typename Callback1<T*>::Type MessageReceivedCallback; |
+ typedef typename Callback2<T*, Task*>::Type MessageReceivedCallback; |
ProtobufMessageReader() { }; |
~ProtobufMessageReader() { }; |
void Init(net::Socket* socket, MessageReceivedCallback* callback) { |
message_received_callback_.reset(callback); |
- message_reader_.Init( |
+ message_reader_ = new MessageReader(); |
+ message_reader_->Init( |
socket, NewCallback(this, &ProtobufMessageReader<T>::OnNewData)); |
} |
private: |
- void OnNewData(CompoundBuffer* buffer) { |
+ void OnNewData(CompoundBuffer* buffer, Task* done_task) { |
T* message = new T(); |
CompoundBufferInputStream stream(buffer); |
bool ret = message->ParseFromZeroCopyStream(&stream); |
if (!ret) { |
+ LOG(WARNING) << "Received message that is not a valid protocol buffer."; |
delete message; |
} else { |
- message_received_callback_->Run(message); |
+ DCHECK_EQ(stream.position(), buffer->total_bytes()); |
+ message_received_callback_->Run( |
+ message, NewRunnableFunction( |
+ &ProtobufMessageReader<T>::OnDone, message, done_task)); |
} |
} |
- MessageReader message_reader_; |
+ static void OnDone(T* message, Task* done_task) { |
+ delete message; |
+ done_task->Run(); |
+ delete done_task; |
+ } |
+ |
+ scoped_refptr<MessageReader> message_reader_; |
scoped_ptr<MessageReceivedCallback> message_received_callback_; |
}; |