| Index: remoting/protocol/message_reader.h
|
| diff --git a/remoting/protocol/message_reader.h b/remoting/protocol/message_reader.h
|
| index 85226300b34e8476207718ca04498cce0400fe64..d4937780590fd5bff33d29bfb203f354c9120c84 100644
|
| --- a/remoting/protocol/message_reader.h
|
| +++ b/remoting/protocol/message_reader.h
|
| @@ -10,87 +10,35 @@
|
| #include "base/scoped_ptr.h"
|
| #include "base/task.h"
|
| #include "net/base/completion_callback.h"
|
| -#include "net/base/io_buffer.h"
|
| +#include "remoting/base/compound_buffer.h"
|
| #include "remoting/protocol/message_decoder.h"
|
|
|
| namespace net {
|
| +class IOBuffer;
|
| class Socket;
|
| } // namespace net
|
|
|
| namespace remoting {
|
| namespace protocol {
|
|
|
| -class MessageReader;
|
| -
|
| -namespace internal {
|
| -
|
| -template <class T>
|
| -class MessageReaderPrivate {
|
| - private:
|
| - friend class remoting::protocol::MessageReader;
|
| -
|
| - typedef typename Callback1<T*>::Type MessageReceivedCallback;
|
| -
|
| - MessageReaderPrivate(MessageReceivedCallback* callback)
|
| - : message_received_callback_(callback) {
|
| - }
|
| -
|
| - ~MessageReaderPrivate() { }
|
| -
|
| - void OnDataReceived(net::IOBuffer* buffer, int data_size) {
|
| - typedef typename std::list<T*>::iterator MessageListIterator;
|
| -
|
| - std::list<T*> message_list;
|
| - message_decoder_.ParseMessages(buffer, data_size, &message_list);
|
| - for (MessageListIterator it = message_list.begin();
|
| - it != message_list.end(); ++it) {
|
| - message_received_callback_->Run(*it);
|
| - }
|
| - }
|
| -
|
| - void Destroy() {
|
| - delete this;
|
| - }
|
| -
|
| - // Message decoder is used to decode bytes into protobuf message.
|
| - MessageDecoder message_decoder_;
|
| -
|
| - // Callback is called when a message is received.
|
| - scoped_ptr<MessageReceivedCallback> message_received_callback_;
|
| -};
|
| -
|
| -} // namespace internal
|
| -
|
| -// MessageReader reads data from the socket asynchronously and uses
|
| -// MessageReaderPrivate to decode the data received.
|
| +// MessageReader reads data from the socket asynchronously and calls
|
| +// callback for each message it receives
|
| class MessageReader {
|
| public:
|
| + typedef Callback1<CompoundBuffer*>::Type MessageReceivedCallback;
|
| +
|
| MessageReader();
|
| virtual ~MessageReader();
|
|
|
| - // Stops reading. Must be called on the same thread as Init().
|
| - void Close();
|
| -
|
| // Initialize the MessageReader with a socket. If a message is received
|
| // |callback| is called.
|
| - template <class T>
|
| - void Init(net::Socket* socket, typename Callback1<T*>::Type* callback) {
|
| - internal::MessageReaderPrivate<T>* reader =
|
| - new internal::MessageReaderPrivate<T>(callback);
|
| - data_received_callback_.reset(
|
| - ::NewCallback(
|
| - reader, &internal::MessageReaderPrivate<T>::OnDataReceived));
|
| - destruction_callback_.reset(
|
| - ::NewCallback(reader, &internal::MessageReaderPrivate<T>::Destroy));
|
| - Init(socket);
|
| - }
|
| + void Init(net::Socket* socket, MessageReceivedCallback* callback);
|
|
|
| private:
|
| - void Init(net::Socket* socket);
|
| -
|
| void DoRead();
|
| void OnRead(int result);
|
| void HandleReadResult(int result);
|
| + void OnDataReceived(net::IOBuffer* data, int data_size);
|
|
|
| net::Socket* socket_;
|
|
|
| @@ -98,8 +46,40 @@ class MessageReader {
|
| scoped_refptr<net::IOBuffer> read_buffer_;
|
| net::CompletionCallbackImpl<MessageReader> read_callback_;
|
|
|
| - scoped_ptr<Callback2<net::IOBuffer*, int>::Type> data_received_callback_;
|
| - scoped_ptr<Callback0::Type> destruction_callback_;
|
| + MessageDecoder message_decoder_;
|
| +
|
| + // Callback is called when a message is received.
|
| + scoped_ptr<MessageReceivedCallback> message_received_callback_;
|
| +};
|
| +
|
| +template <class T>
|
| +class ProtobufMessageReader {
|
| + public:
|
| + typedef typename Callback1<T*>::Type MessageReceivedCallback;
|
| +
|
| + ProtobufMessageReader() { };
|
| + ~ProtobufMessageReader() { };
|
| +
|
| + void Init(net::Socket* socket, MessageReceivedCallback* callback) {
|
| + message_received_callback_.reset(callback);
|
| + message_reader_.Init(
|
| + socket, NewCallback(this, &ProtobufMessageReader<T>::OnNewData));
|
| + }
|
| +
|
| + private:
|
| + void OnNewData(CompoundBuffer* buffer) {
|
| + T* message = new T();
|
| + CompoundBufferInputStream stream(buffer);
|
| + bool ret = message->ParseFromZeroCopyStream(&stream);
|
| + if (!ret) {
|
| + delete message;
|
| + } else {
|
| + message_received_callback_->Run(message);
|
| + }
|
| + }
|
| +
|
| + MessageReader message_reader_;
|
| + scoped_ptr<MessageReceivedCallback> message_received_callback_;
|
| };
|
|
|
| } // namespace protocol
|
|
|