Index: remoting/protocol/message_reader.h |
diff --git a/remoting/protocol/message_reader.h b/remoting/protocol/message_reader.h |
index 85226300b34e8476207718ca04498cce0400fe64..d4937780590fd5bff33d29bfb203f354c9120c84 100644 |
--- a/remoting/protocol/message_reader.h |
+++ b/remoting/protocol/message_reader.h |
@@ -10,87 +10,35 @@ |
#include "base/scoped_ptr.h" |
#include "base/task.h" |
#include "net/base/completion_callback.h" |
-#include "net/base/io_buffer.h" |
+#include "remoting/base/compound_buffer.h" |
#include "remoting/protocol/message_decoder.h" |
namespace net { |
+class IOBuffer; |
class Socket; |
} // namespace net |
namespace remoting { |
namespace protocol { |
-class MessageReader; |
- |
-namespace internal { |
- |
-template <class T> |
-class MessageReaderPrivate { |
- private: |
- friend class remoting::protocol::MessageReader; |
- |
- typedef typename Callback1<T*>::Type MessageReceivedCallback; |
- |
- MessageReaderPrivate(MessageReceivedCallback* callback) |
- : message_received_callback_(callback) { |
- } |
- |
- ~MessageReaderPrivate() { } |
- |
- void OnDataReceived(net::IOBuffer* buffer, int data_size) { |
- typedef typename std::list<T*>::iterator MessageListIterator; |
- |
- std::list<T*> message_list; |
- message_decoder_.ParseMessages(buffer, data_size, &message_list); |
- for (MessageListIterator it = message_list.begin(); |
- it != message_list.end(); ++it) { |
- message_received_callback_->Run(*it); |
- } |
- } |
- |
- void Destroy() { |
- delete this; |
- } |
- |
- // Message decoder is used to decode bytes into protobuf message. |
- MessageDecoder message_decoder_; |
- |
- // Callback is called when a message is received. |
- scoped_ptr<MessageReceivedCallback> message_received_callback_; |
-}; |
- |
-} // namespace internal |
- |
-// MessageReader reads data from the socket asynchronously and uses |
-// MessageReaderPrivate to decode the data received. |
+// MessageReader reads data from the socket asynchronously and calls |
+// callback for each message it receives |
class MessageReader { |
public: |
+ typedef Callback1<CompoundBuffer*>::Type MessageReceivedCallback; |
+ |
MessageReader(); |
virtual ~MessageReader(); |
- // Stops reading. Must be called on the same thread as Init(). |
- void Close(); |
- |
// Initialize the MessageReader with a socket. If a message is received |
// |callback| is called. |
- template <class T> |
- void Init(net::Socket* socket, typename Callback1<T*>::Type* callback) { |
- internal::MessageReaderPrivate<T>* reader = |
- new internal::MessageReaderPrivate<T>(callback); |
- data_received_callback_.reset( |
- ::NewCallback( |
- reader, &internal::MessageReaderPrivate<T>::OnDataReceived)); |
- destruction_callback_.reset( |
- ::NewCallback(reader, &internal::MessageReaderPrivate<T>::Destroy)); |
- Init(socket); |
- } |
+ void Init(net::Socket* socket, MessageReceivedCallback* callback); |
private: |
- void Init(net::Socket* socket); |
- |
void DoRead(); |
void OnRead(int result); |
void HandleReadResult(int result); |
+ void OnDataReceived(net::IOBuffer* data, int data_size); |
net::Socket* socket_; |
@@ -98,8 +46,40 @@ class MessageReader { |
scoped_refptr<net::IOBuffer> read_buffer_; |
net::CompletionCallbackImpl<MessageReader> read_callback_; |
- scoped_ptr<Callback2<net::IOBuffer*, int>::Type> data_received_callback_; |
- scoped_ptr<Callback0::Type> destruction_callback_; |
+ MessageDecoder message_decoder_; |
+ |
+ // Callback is called when a message is received. |
+ scoped_ptr<MessageReceivedCallback> message_received_callback_; |
+}; |
+ |
+template <class T> |
+class ProtobufMessageReader { |
+ public: |
+ typedef typename Callback1<T*>::Type MessageReceivedCallback; |
+ |
+ ProtobufMessageReader() { }; |
+ ~ProtobufMessageReader() { }; |
+ |
+ void Init(net::Socket* socket, MessageReceivedCallback* callback) { |
+ message_received_callback_.reset(callback); |
+ message_reader_.Init( |
+ socket, NewCallback(this, &ProtobufMessageReader<T>::OnNewData)); |
+ } |
+ |
+ private: |
+ void OnNewData(CompoundBuffer* buffer) { |
+ T* message = new T(); |
+ CompoundBufferInputStream stream(buffer); |
+ bool ret = message->ParseFromZeroCopyStream(&stream); |
+ if (!ret) { |
+ delete message; |
+ } else { |
+ message_received_callback_->Run(message); |
+ } |
+ } |
+ |
+ MessageReader message_reader_; |
+ scoped_ptr<MessageReceivedCallback> message_received_callback_; |
}; |
} // namespace protocol |