Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(58)

Side by Side Diff: remoting/protocol/message_reader.h

Issue 9827006: Refactor VideoStub interface to accept ownership of video packets. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « remoting/protocol/host_event_dispatcher.cc ('k') | remoting/protocol/message_reader.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #ifndef REMOTING_PROTOCOL_MESSAGE_READER_H_ 5 #ifndef REMOTING_PROTOCOL_MESSAGE_READER_H_
6 #define REMOTING_PROTOCOL_MESSAGE_READER_H_ 6 #define REMOTING_PROTOCOL_MESSAGE_READER_H_
7 7
8 #include "base/bind.h" 8 #include "base/bind.h"
9 #include "base/callback.h" 9 #include "base/callback.h"
10 #include "base/memory/ref_counted.h" 10 #include "base/memory/ref_counted.h"
(...skipping 19 matching lines...) Expand all
30 // always be closed before the callback handler is destroyed. 30 // always be closed before the callback handler is destroyed.
31 // 31 //
32 // In order to throttle the stream, MessageReader doesn't try to read 32 // In order to throttle the stream, MessageReader doesn't try to read
33 // new data from the socket until all previously received messages are 33 // new data from the socket until all previously received messages are
34 // processed by the receiver (|done_task| is called for each message). 34 // processed by the receiver (|done_task| is called for each message).
35 // It is still possible that the MessageReceivedCallback is called 35 // It is still possible that the MessageReceivedCallback is called
36 // twice (so that there is more than one outstanding message), 36 // twice (so that there is more than one outstanding message),
37 // e.g. when we the sender sends multiple messages in one TCP packet. 37 // e.g. when we the sender sends multiple messages in one TCP packet.
38 class MessageReader : public base::RefCountedThreadSafe<MessageReader> { 38 class MessageReader : public base::RefCountedThreadSafe<MessageReader> {
39 public: 39 public:
40 // The callback is given ownership of the second argument 40 typedef base::Callback<void(scoped_ptr<CompoundBuffer>, const base::Closure&)>
41 // (|done_task|). The buffer (first argument) is owned by
42 // MessageReader and is freed when the task specified by the second
43 // argument is called.
44 typedef base::Callback<void(CompoundBuffer*, const base::Closure&)>
45 MessageReceivedCallback; 41 MessageReceivedCallback;
46 42
47 MessageReader(); 43 MessageReader();
48 virtual ~MessageReader(); 44 virtual ~MessageReader();
49 45
50 // Initialize the MessageReader with a socket. If a message is received 46 // Initialize the MessageReader with a socket. If a message is received
51 // |callback| is called. 47 // |callback| is called.
52 void Init(net::Socket* socket, const MessageReceivedCallback& callback); 48 void Init(net::Socket* socket, const MessageReceivedCallback& callback);
53 49
54 private: 50 private:
55 void DoRead(); 51 void DoRead();
56 void OnRead(int result); 52 void OnRead(int result);
57 void HandleReadResult(int result); 53 void HandleReadResult(int result);
58 void OnDataReceived(net::IOBuffer* data, int data_size); 54 void OnDataReceived(net::IOBuffer* data, int data_size);
59 void OnMessageDone(CompoundBuffer* message, 55 void OnMessageDone(scoped_refptr<base::MessageLoopProxy> message_loop);
60 scoped_refptr<base::MessageLoopProxy> message_loop);
61 void ProcessDoneEvent(); 56 void ProcessDoneEvent();
62 57
63 net::Socket* socket_; 58 net::Socket* socket_;
64 59
65 // Set to true, when we have a socket read pending, and expecting 60 // Set to true, when we have a socket read pending, and expecting
66 // OnRead() to be called when new data is received. 61 // OnRead() to be called when new data is received.
67 bool read_pending_; 62 bool read_pending_;
68 63
69 // Number of messages that we received, but haven't finished 64 // Number of messages that we received, but haven't finished
70 // processing yet, i.e. |done_task| hasn't been called for these 65 // processing yet, i.e. |done_task| hasn't been called for these
71 // messages. 66 // messages.
72 int pending_messages_; 67 int pending_messages_;
73 68
74 bool closed_; 69 bool closed_;
75 scoped_refptr<net::IOBuffer> read_buffer_; 70 scoped_refptr<net::IOBuffer> read_buffer_;
76 71
77 MessageDecoder message_decoder_; 72 MessageDecoder message_decoder_;
78 73
79 // Callback is called when a message is received. 74 // Callback is called when a message is received.
80 MessageReceivedCallback message_received_callback_; 75 MessageReceivedCallback message_received_callback_;
81 }; 76 };
82 77
83 // Version of MessageReader for protocol buffer messages, that parses 78 // Version of MessageReader for protocol buffer messages, that parses
84 // each incoming message. 79 // each incoming message.
85 template <class T> 80 template <class T>
86 class ProtobufMessageReader { 81 class ProtobufMessageReader {
87 public: 82 public:
88 typedef typename base::Callback<void(T*, const base::Closure&)> 83 typedef typename base::Callback<void(scoped_ptr<T>, const base::Closure&)>
89 MessageReceivedCallback; 84 MessageReceivedCallback;
90 85
91 ProtobufMessageReader() { }; 86 ProtobufMessageReader() { };
92 ~ProtobufMessageReader() { }; 87 ~ProtobufMessageReader() { };
93 88
94 void Init(net::Socket* socket, const MessageReceivedCallback& callback) { 89 void Init(net::Socket* socket, const MessageReceivedCallback& callback) {
95 DCHECK(!callback.is_null()); 90 DCHECK(!callback.is_null());
96 message_received_callback_ = callback; 91 message_received_callback_ = callback;
97 message_reader_ = new MessageReader(); 92 message_reader_ = new MessageReader();
98 message_reader_->Init( 93 message_reader_->Init(
99 socket, base::Bind(&ProtobufMessageReader<T>::OnNewData, 94 socket, base::Bind(&ProtobufMessageReader<T>::OnNewData,
100 base::Unretained(this))); 95 base::Unretained(this)));
101 } 96 }
102 97
103 private: 98 private:
104 void OnNewData(CompoundBuffer* buffer, const base::Closure& done_task) { 99 void OnNewData(scoped_ptr<CompoundBuffer> buffer,
105 T* message = new T(); 100 const base::Closure& done_task) {
106 CompoundBufferInputStream stream(buffer); 101 scoped_ptr<T> message(new T());
102 CompoundBufferInputStream stream(buffer.get());
107 bool ret = message->ParseFromZeroCopyStream(&stream); 103 bool ret = message->ParseFromZeroCopyStream(&stream);
108 if (!ret) { 104 if (!ret) {
109 LOG(WARNING) << "Received message that is not a valid protocol buffer."; 105 LOG(WARNING) << "Received message that is not a valid protocol buffer.";
110 delete message;
111 } else { 106 } else {
112 DCHECK_EQ(stream.position(), buffer->total_bytes()); 107 DCHECK_EQ(stream.position(), buffer->total_bytes());
113 message_received_callback_.Run(message, base::Bind( 108 message_received_callback_.Run(message.Pass(), done_task);
114 &ProtobufMessageReader<T>::OnDone, message, done_task));
115 } 109 }
116 } 110 }
117 111
118 static void OnDone(T* message, const base::Closure& done_task) {
119 delete message;
120 done_task.Run();
121 }
122
123 scoped_refptr<MessageReader> message_reader_; 112 scoped_refptr<MessageReader> message_reader_;
124 MessageReceivedCallback message_received_callback_; 113 MessageReceivedCallback message_received_callback_;
125 }; 114 };
126 115
127 } // namespace protocol 116 } // namespace protocol
128 } // namespace remoting 117 } // namespace remoting
129 118
130 #endif // REMOTING_PROTOCOL_MESSAGE_READER_H_ 119 #endif // REMOTING_PROTOCOL_MESSAGE_READER_H_
OLDNEW
« no previous file with comments | « remoting/protocol/host_event_dispatcher.cc ('k') | remoting/protocol/message_reader.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698