OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #ifndef REMOTING_PROTOCOL_MESSAGE_READER_H_ | 5 #ifndef REMOTING_PROTOCOL_MESSAGE_READER_H_ |
6 #define REMOTING_PROTOCOL_MESSAGE_READER_H_ | 6 #define REMOTING_PROTOCOL_MESSAGE_READER_H_ |
7 | 7 |
8 #include "base/bind.h" | |
9 #include "base/callback.h" | 8 #include "base/callback.h" |
10 #include "base/memory/scoped_ptr.h" | 9 #include "base/memory/scoped_ptr.h" |
11 #include "base/memory/weak_ptr.h" | 10 #include "base/memory/weak_ptr.h" |
12 #include "base/threading/non_thread_safe.h" | 11 #include "base/threading/non_thread_safe.h" |
13 #include "net/base/completion_callback.h" | |
14 #include "remoting/base/compound_buffer.h" | 12 #include "remoting/base/compound_buffer.h" |
15 #include "remoting/protocol/message_decoder.h" | 13 #include "remoting/protocol/message_decoder.h" |
16 | 14 |
17 namespace net { | 15 namespace net { |
18 class IOBuffer; | 16 class IOBuffer; |
19 class Socket; | 17 class Socket; |
20 } // namespace net | 18 } // namespace net |
21 | 19 |
22 namespace remoting { | 20 namespace remoting { |
23 namespace protocol { | 21 namespace protocol { |
24 | 22 |
25 // MessageReader reads data from the socket asynchronously and calls | 23 // MessageReader reads data from the socket asynchronously and calls |
26 // callback for each message it receives. It stops calling the | 24 // callback for each message it receives. It stops calling the |
27 // callback as soon as the socket is closed, so the socket should | 25 // callback as soon as the socket is closed, so the socket should |
28 // always be closed before the callback handler is destroyed. | 26 // always be closed before the callback handler is destroyed. |
29 // | 27 // |
30 // In order to throttle the stream, MessageReader doesn't try to read | 28 // In order to throttle the stream, MessageReader doesn't try to read |
31 // new data from the socket until all previously received messages are | 29 // new data from the socket until all previously received messages are |
32 // processed by the receiver (|done_task| is called for each message). | 30 // processed by the receiver (|done_task| is called for each message). |
33 // It is still possible that the MessageReceivedCallback is called | 31 // It is still possible that the MessageReceivedCallback is called |
34 // twice (so that there is more than one outstanding message), | 32 // twice (so that there is more than one outstanding message), |
35 // e.g. when we the sender sends multiple messages in one TCP packet. | 33 // e.g. when we the sender sends multiple messages in one TCP packet. |
36 class MessageReader : public base::NonThreadSafe { | 34 class MessageReader : public base::NonThreadSafe { |
37 public: | 35 public: |
38 typedef base::Callback<void(scoped_ptr<CompoundBuffer>, const base::Closure&)> | 36 typedef base::Callback<void(scoped_ptr<CompoundBuffer>, const base::Closure&)> |
39 MessageReceivedCallback; | 37 MessageReceivedCallback; |
40 | 38 |
41 MessageReader(); | 39 explicit MessageReader(); |
rmsousa
2015/01/08 02:22:31
nit: explicit with zero arguments?
Sergey Ulanov
2015/01/08 19:20:32
Done.
| |
42 virtual ~MessageReader(); | 40 virtual ~MessageReader(); |
43 | 41 |
44 // Initialize the MessageReader with a socket. If a message is received | 42 // Sets the callback to be called for each incoming message. |
45 // |callback| is called. | 43 void SetMessageReceivedCallback(const MessageReceivedCallback& callback); |
46 void Init(net::Socket* socket, const MessageReceivedCallback& callback); | 44 |
45 // Starts reading from |socket|. | |
46 void StartReading(net::Socket* socket); | |
47 | 47 |
48 private: | 48 private: |
49 void DoRead(); | 49 void DoRead(); |
50 void OnRead(int result); | 50 void OnRead(int result); |
51 void HandleReadResult(int result); | 51 void HandleReadResult(int result); |
52 void OnDataReceived(net::IOBuffer* data, int data_size); | 52 void OnDataReceived(net::IOBuffer* data, int data_size); |
53 void RunCallback(scoped_ptr<CompoundBuffer> message); | 53 void RunCallback(scoped_ptr<CompoundBuffer> message); |
54 void OnMessageDone(); | 54 void OnMessageDone(); |
55 | 55 |
56 net::Socket* socket_; | 56 net::Socket* socket_; |
(...skipping 13 matching lines...) Expand all Loading... | |
70 MessageDecoder message_decoder_; | 70 MessageDecoder message_decoder_; |
71 | 71 |
72 // Callback is called when a message is received. | 72 // Callback is called when a message is received. |
73 MessageReceivedCallback message_received_callback_; | 73 MessageReceivedCallback message_received_callback_; |
74 | 74 |
75 base::WeakPtrFactory<MessageReader> weak_factory_; | 75 base::WeakPtrFactory<MessageReader> weak_factory_; |
76 | 76 |
77 DISALLOW_COPY_AND_ASSIGN(MessageReader); | 77 DISALLOW_COPY_AND_ASSIGN(MessageReader); |
78 }; | 78 }; |
79 | 79 |
80 // Version of MessageReader for protocol buffer messages, that parses | |
81 // each incoming message. | |
82 template <class T> | |
83 class ProtobufMessageReader { | |
84 public: | |
85 // The callback that is called when a new message is received. |done_task| | |
86 // must be called by the callback when it's done processing the |message|. | |
87 typedef typename base::Callback<void(scoped_ptr<T> message, | |
88 const base::Closure& done_task)> | |
89 MessageReceivedCallback; | |
90 | |
91 ProtobufMessageReader() { }; | |
92 ~ProtobufMessageReader() { }; | |
93 | |
94 void Init(net::Socket* socket, const MessageReceivedCallback& callback) { | |
95 DCHECK(!callback.is_null()); | |
96 message_received_callback_ = callback; | |
97 message_reader_.reset(new MessageReader()); | |
98 message_reader_->Init( | |
99 socket, base::Bind(&ProtobufMessageReader<T>::OnNewData, | |
100 base::Unretained(this))); | |
101 } | |
102 | |
103 private: | |
104 void OnNewData(scoped_ptr<CompoundBuffer> buffer, | |
105 const base::Closure& done_task) { | |
106 scoped_ptr<T> message(new T()); | |
107 CompoundBufferInputStream stream(buffer.get()); | |
108 bool ret = message->ParseFromZeroCopyStream(&stream); | |
109 if (!ret) { | |
110 LOG(WARNING) << "Received message that is not a valid protocol buffer."; | |
111 } else { | |
112 DCHECK_EQ(stream.position(), buffer->total_bytes()); | |
113 message_received_callback_.Run(message.Pass(), done_task); | |
114 } | |
115 } | |
116 | |
117 scoped_ptr<MessageReader> message_reader_; | |
118 MessageReceivedCallback message_received_callback_; | |
119 }; | |
120 | |
121 } // namespace protocol | 80 } // namespace protocol |
122 } // namespace remoting | 81 } // namespace remoting |
123 | 82 |
124 #endif // REMOTING_PROTOCOL_MESSAGE_READER_H_ | 83 #endif // REMOTING_PROTOCOL_MESSAGE_READER_H_ |
OLD | NEW |