Index: remoting/protocol/message_reader.h |
diff --git a/remoting/protocol/message_reader.h b/remoting/protocol/message_reader.h |
index d4937780590fd5bff33d29bfb203f354c9120c84..d0953de9f7d43d732d2d8a5f5b4251534d4bfa2e 100644 |
--- a/remoting/protocol/message_reader.h |
+++ b/remoting/protocol/message_reader.h |
@@ -25,7 +25,11 @@ namespace protocol { |
// callback for each message it receives |
awong
2011/01/20 20:06:38
We should document the flow control semantics here
Sergey Ulanov
2011/01/20 21:55:57
Added more comments
|
class MessageReader { |
public: |
- typedef Callback1<CompoundBuffer*>::Type MessageReceivedCallback; |
+ // The callback is given ownership of the second argument |
+ // (|done_task|). The buffer (first argument) is owned by |
+ // MessageReader and is freed when the task specified by the second |
+ // argument is called. |
awong
2011/01/20 20:06:38
Nice comment!
Thanks for being disciplined with d
|
+ typedef Callback2<CompoundBuffer*, Task*>::Type MessageReceivedCallback; |
MessageReader(); |
virtual ~MessageReader(); |
@@ -35,27 +39,37 @@ class MessageReader { |
void Init(net::Socket* socket, MessageReceivedCallback* callback); |
private: |
+ class DoneTaskHandler; |
+ friend class DoneTaskHandler; |
+ |
void DoRead(); |
void OnRead(int result); |
void HandleReadResult(int result); |
void OnDataReceived(net::IOBuffer* data, int data_size); |
+ void OnMessageDone(); |
net::Socket* socket_; |
+ bool read_pending_; |
Alpha Left Google
2011/01/20 20:54:52
Please add some comments for these variables, it w
Sergey Ulanov
2011/01/20 21:55:57
Done.
|
+ int pending_messages_; |
bool closed_; |
scoped_refptr<net::IOBuffer> read_buffer_; |
net::CompletionCallbackImpl<MessageReader> read_callback_; |
MessageDecoder message_decoder_; |
+ scoped_refptr<DoneTaskHandler> done_task_handler_; |
+ |
// Callback is called when a message is received. |
scoped_ptr<MessageReceivedCallback> message_received_callback_; |
}; |
+// Version of MessageReader for protocol buffer messages, that parses |
+// each incoming message. |
template <class T> |
class ProtobufMessageReader { |
public: |
- typedef typename Callback1<T*>::Type MessageReceivedCallback; |
+ typedef typename Callback2<T*, Task*>::Type MessageReceivedCallback; |
ProtobufMessageReader() { }; |
~ProtobufMessageReader() { }; |
@@ -67,17 +81,25 @@ class ProtobufMessageReader { |
} |
private: |
- void OnNewData(CompoundBuffer* buffer) { |
+ void OnNewData(CompoundBuffer* buffer, Task* done_task) { |
T* message = new T(); |
CompoundBufferInputStream stream(buffer); |
bool ret = message->ParseFromZeroCopyStream(&stream); |
if (!ret) { |
delete message; |
} else { |
- message_received_callback_->Run(message); |
+ message_received_callback_->Run( |
+ message, NewRunnableFunction( |
+ &ProtobufMessageReader<T>::OnDone, message, done_task)); |
} |
} |
+ static void OnDone(T* message, Task* done_task) { |
+ delete message; |
+ done_task->Run(); |
+ delete done_task; |
+ } |
+ |
MessageReader message_reader_; |
scoped_ptr<MessageReceivedCallback> message_received_callback_; |
}; |