| Index: remoting/protocol/message_reader.h
|
| diff --git a/remoting/protocol/message_reader.h b/remoting/protocol/message_reader.h
|
| index d4937780590fd5bff33d29bfb203f354c9120c84..d14cb90b8a651f812454513faac1455f0a3e0319 100644
|
| --- a/remoting/protocol/message_reader.h
|
| +++ b/remoting/protocol/message_reader.h
|
| @@ -13,6 +13,8 @@
|
| #include "remoting/base/compound_buffer.h"
|
| #include "remoting/protocol/message_decoder.h"
|
|
|
| +class MessageLoop;
|
| +
|
| namespace net {
|
| class IOBuffer;
|
| class Socket;
|
| @@ -22,10 +24,23 @@ namespace remoting {
|
| namespace protocol {
|
|
|
| // MessageReader reads data from the socket asynchronously and calls
|
| -// callback for each message it receives
|
| -class MessageReader {
|
| +// callback for each message it receives. It stops calling the
|
| +// callback as soon as the socket is closed, so the socket should
|
| +// always be closed before the callback handler is destroyed.
|
| +//
|
| +// In order to throttle the stream, MessageReader doesn't try to read
|
| +// new data from the socket until all previously received messages are
|
| +// processed by the receiver (|done_task| is called for each message).
|
| +// It is still possible that the MessageReceivedCallback is called
|
| +// twice (so that there is more than one outstanding message),
|
| +// e.g. when we the sender sends multiple messages in one TCP packet.
|
| +class MessageReader : public base::RefCountedThreadSafe<MessageReader> {
|
| public:
|
| - typedef Callback1<CompoundBuffer*>::Type MessageReceivedCallback;
|
| + // The callback is given ownership of the second argument
|
| + // (|done_task|). The buffer (first argument) is owned by
|
| + // MessageReader and is freed when the task specified by the second
|
| + // argument is called.
|
| + typedef Callback2<CompoundBuffer*, Task*>::Type MessageReceivedCallback;
|
|
|
| MessageReader();
|
| virtual ~MessageReader();
|
| @@ -39,9 +54,23 @@ class MessageReader {
|
| void OnRead(int result);
|
| void HandleReadResult(int result);
|
| void OnDataReceived(net::IOBuffer* data, int data_size);
|
| + void OnMessageDone(CompoundBuffer* message);
|
| + void ProcessDoneEvent();
|
|
|
| net::Socket* socket_;
|
|
|
| + // The network message loop this object runs on.
|
| + MessageLoop* message_loop_;
|
| +
|
| + // Set to true, when we have a socket read pending, and expecting
|
| + // OnRead() to be called when new data is received.
|
| + bool read_pending_;
|
| +
|
| + // Number of messages that we received, but haven't finished
|
| + // processing yet, i.e. |done_task| hasn't been called for these
|
| + // messages.
|
| + int pending_messages_;
|
| +
|
| bool closed_;
|
| scoped_refptr<net::IOBuffer> read_buffer_;
|
| net::CompletionCallbackImpl<MessageReader> read_callback_;
|
| @@ -52,33 +81,46 @@ class MessageReader {
|
| scoped_ptr<MessageReceivedCallback> message_received_callback_;
|
| };
|
|
|
| +// Version of MessageReader for protocol buffer messages, that parses
|
| +// each incoming message.
|
| template <class T>
|
| class ProtobufMessageReader {
|
| public:
|
| - typedef typename Callback1<T*>::Type MessageReceivedCallback;
|
| + typedef typename Callback2<T*, Task*>::Type MessageReceivedCallback;
|
|
|
| ProtobufMessageReader() { };
|
| ~ProtobufMessageReader() { };
|
|
|
| void Init(net::Socket* socket, MessageReceivedCallback* callback) {
|
| message_received_callback_.reset(callback);
|
| - message_reader_.Init(
|
| + message_reader_ = new MessageReader();
|
| + message_reader_->Init(
|
| socket, NewCallback(this, &ProtobufMessageReader<T>::OnNewData));
|
| }
|
|
|
| private:
|
| - void OnNewData(CompoundBuffer* buffer) {
|
| + void OnNewData(CompoundBuffer* buffer, Task* done_task) {
|
| T* message = new T();
|
| CompoundBufferInputStream stream(buffer);
|
| bool ret = message->ParseFromZeroCopyStream(&stream);
|
| if (!ret) {
|
| + LOG(WARNING) << "Received message that is not a valid protocol buffer.";
|
| delete message;
|
| } else {
|
| - message_received_callback_->Run(message);
|
| + DCHECK_EQ(stream.position(), buffer->total_bytes());
|
| + message_received_callback_->Run(
|
| + message, NewRunnableFunction(
|
| + &ProtobufMessageReader<T>::OnDone, message, done_task));
|
| }
|
| }
|
|
|
| - MessageReader message_reader_;
|
| + static void OnDone(T* message, Task* done_task) {
|
| + delete message;
|
| + done_task->Run();
|
| + delete done_task;
|
| + }
|
| +
|
| + scoped_refptr<MessageReader> message_reader_;
|
| scoped_ptr<MessageReceivedCallback> message_received_callback_;
|
| };
|
|
|
|
|