Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(57)

Side by Side Diff: remoting/protocol/buffered_socket_writer.cc

Issue 7633009: Use MessageLoopProxy for network message loop. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: - Created 9 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « remoting/protocol/buffered_socket_writer.h ('k') | remoting/protocol/client_control_sender.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/buffered_socket_writer.h" 5 #include "remoting/protocol/buffered_socket_writer.h"
6 6
7 #include "base/message_loop.h" 7 #include "base/message_loop_proxy.h"
8 #include "base/stl_util.h" 8 #include "base/stl_util.h"
9 #include "net/base/net_errors.h" 9 #include "net/base/net_errors.h"
10 10
11 namespace remoting { 11 namespace remoting {
12 namespace protocol { 12 namespace protocol {
13 13
14 class BufferedSocketWriterBase::PendingPacket { 14 class BufferedSocketWriterBase::PendingPacket {
15 public: 15 public:
16 PendingPacket(scoped_refptr<net::IOBufferWithSize> data, Task* done_task) 16 PendingPacket(scoped_refptr<net::IOBufferWithSize> data, Task* done_task)
17 : data_(data), 17 : data_(data),
18 done_task_(done_task) { 18 done_task_(done_task) {
19 } 19 }
20 ~PendingPacket() { 20 ~PendingPacket() {
21 if (done_task_.get()) 21 if (done_task_.get())
22 done_task_->Run(); 22 done_task_->Run();
23 } 23 }
24 24
25 net::IOBufferWithSize* data() { 25 net::IOBufferWithSize* data() {
26 return data_; 26 return data_;
27 } 27 }
28 28
29 private: 29 private:
30 scoped_refptr<net::IOBufferWithSize> data_; 30 scoped_refptr<net::IOBufferWithSize> data_;
31 scoped_ptr<Task> done_task_; 31 scoped_ptr<Task> done_task_;
32 32
33 DISALLOW_COPY_AND_ASSIGN(PendingPacket); 33 DISALLOW_COPY_AND_ASSIGN(PendingPacket);
34 }; 34 };
35 35
36 BufferedSocketWriterBase::BufferedSocketWriterBase() 36 BufferedSocketWriterBase::BufferedSocketWriterBase(
37 base::MessageLoopProxy* message_loop)
37 : buffer_size_(0), 38 : buffer_size_(0),
38 socket_(NULL), 39 socket_(NULL),
39 message_loop_(NULL), 40 message_loop_(message_loop),
40 write_pending_(false), 41 write_pending_(false),
41 ALLOW_THIS_IN_INITIALIZER_LIST( 42 ALLOW_THIS_IN_INITIALIZER_LIST(
42 written_callback_(this, &BufferedSocketWriterBase::OnWritten)), 43 written_callback_(this, &BufferedSocketWriterBase::OnWritten)),
43 closed_(false) { 44 closed_(false) {
44 } 45 }
45 46
46 BufferedSocketWriterBase::~BufferedSocketWriterBase() { } 47 BufferedSocketWriterBase::~BufferedSocketWriterBase() { }
47 48
48 void BufferedSocketWriterBase::Init(net::Socket* socket, 49 void BufferedSocketWriterBase::Init(net::Socket* socket,
49 WriteFailedCallback* callback) { 50 WriteFailedCallback* callback) {
50 // TODO(garykac) Save copy of WriteFailedCallback. 51 // TODO(garykac) Save copy of WriteFailedCallback.
51 base::AutoLock auto_lock(lock_); 52 base::AutoLock auto_lock(lock_);
52 message_loop_ = MessageLoop::current();
53 socket_ = socket; 53 socket_ = socket;
54 DCHECK(socket_); 54 DCHECK(socket_);
55 } 55 }
56 56
57 bool BufferedSocketWriterBase::Write( 57 bool BufferedSocketWriterBase::Write(
58 scoped_refptr<net::IOBufferWithSize> data, Task* done_task) { 58 scoped_refptr<net::IOBufferWithSize> data, Task* done_task) {
59 { 59 {
60 base::AutoLock auto_lock(lock_); 60 base::AutoLock auto_lock(lock_);
61 queue_.push_back(new PendingPacket(data, done_task)); 61 queue_.push_back(new PendingPacket(data, done_task));
62 buffer_size_ += data->size(); 62 buffer_size_ += data->size();
63 } 63 }
64 message_loop_->PostTask( 64 message_loop_->PostTask(
65 FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite)); 65 FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite));
66 return true; 66 return true;
67 } 67 }
68 68
69 void BufferedSocketWriterBase::DoWrite() { 69 void BufferedSocketWriterBase::DoWrite() {
70 DCHECK_EQ(message_loop_, MessageLoop::current()); 70 DCHECK(message_loop_->BelongsToCurrentThread());
71 DCHECK(socket_); 71 DCHECK(socket_);
72 72
73 // Don't try to write if there is another write pending. 73 // Don't try to write if there is another write pending.
74 if (write_pending_) 74 if (write_pending_)
75 return; 75 return;
76 76
77 // Don't write after Close(). 77 // Don't write after Close().
78 if (closed_) 78 if (closed_)
79 return; 79 return;
80 80
(...skipping 21 matching lines...) Expand all
102 HandleError(result); 102 HandleError(result);
103 if (write_failed_callback_.get()) 103 if (write_failed_callback_.get())
104 write_failed_callback_->Run(result); 104 write_failed_callback_->Run(result);
105 } 105 }
106 return; 106 return;
107 } 107 }
108 } 108 }
109 } 109 }
110 110
111 void BufferedSocketWriterBase::OnWritten(int result) { 111 void BufferedSocketWriterBase::OnWritten(int result) {
112 DCHECK_EQ(message_loop_, MessageLoop::current()); 112 DCHECK(message_loop_->BelongsToCurrentThread());
113 write_pending_ = false; 113 write_pending_ = false;
114 114
115 if (result < 0) { 115 if (result < 0) {
116 HandleError(result); 116 HandleError(result);
117 if (write_failed_callback_.get()) 117 if (write_failed_callback_.get())
118 write_failed_callback_->Run(result); 118 write_failed_callback_->Run(result);
119 return; 119 return;
120 } 120 }
121 121
122 { 122 {
123 base::AutoLock auto_lock(lock_); 123 base::AutoLock auto_lock(lock_);
124 AdvanceBufferPosition_Locked(result); 124 AdvanceBufferPosition_Locked(result);
125 } 125 }
126 126
127 // Schedule next write. 127 // Schedule next write.
128 message_loop_->PostTask( 128 message_loop_->PostTask(
129 FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite)); 129 FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite));
130 } 130 }
131 131
132 void BufferedSocketWriterBase::HandleError(int result) { 132 void BufferedSocketWriterBase::HandleError(int result) {
133 DCHECK_EQ(message_loop_, MessageLoop::current()); 133 DCHECK(message_loop_->BelongsToCurrentThread());
134 134
135 closed_ = true; 135 closed_ = true;
136 136
137 base::AutoLock auto_lock(lock_); 137 base::AutoLock auto_lock(lock_);
138 STLDeleteElements(&queue_); 138 STLDeleteElements(&queue_);
139 139
140 // Notify subclass that an error is received. 140 // Notify subclass that an error is received.
141 OnError_Locked(result); 141 OnError_Locked(result);
142 } 142 }
143 143
144 int BufferedSocketWriterBase::GetBufferSize() { 144 int BufferedSocketWriterBase::GetBufferSize() {
145 base::AutoLock auto_lock(lock_); 145 base::AutoLock auto_lock(lock_);
146 return buffer_size_; 146 return buffer_size_;
147 } 147 }
148 148
149 int BufferedSocketWriterBase::GetBufferChunks() { 149 int BufferedSocketWriterBase::GetBufferChunks() {
150 base::AutoLock auto_lock(lock_); 150 base::AutoLock auto_lock(lock_);
151 return queue_.size(); 151 return queue_.size();
152 } 152 }
153 153
154 void BufferedSocketWriterBase::Close() { 154 void BufferedSocketWriterBase::Close() {
155 DCHECK_EQ(message_loop_, MessageLoop::current()); 155 DCHECK(message_loop_->BelongsToCurrentThread());
156 closed_ = true; 156 closed_ = true;
157 } 157 }
158 158
159 void BufferedSocketWriterBase::PopQueue() { 159 void BufferedSocketWriterBase::PopQueue() {
160 // This also calls |done_task|. 160 // This also calls |done_task|.
161 delete queue_.front(); 161 delete queue_.front();
162 queue_.pop_front(); 162 queue_.pop_front();
163 } 163 }
164 164
165 BufferedSocketWriter::BufferedSocketWriter() { } 165 BufferedSocketWriter::BufferedSocketWriter(
166 base::MessageLoopProxy* message_loop)
167 : BufferedSocketWriterBase(message_loop) {
168 }
166 169
167 BufferedSocketWriter::~BufferedSocketWriter() { 170 BufferedSocketWriter::~BufferedSocketWriter() {
168 STLDeleteElements(&queue_); 171 STLDeleteElements(&queue_);
169 } 172 }
170 173
171 void BufferedSocketWriter::GetNextPacket_Locked( 174 void BufferedSocketWriter::GetNextPacket_Locked(
172 net::IOBuffer** buffer, int* size) { 175 net::IOBuffer** buffer, int* size) {
173 if (!current_buf_) { 176 if (!current_buf_) {
174 if (queue_.empty()) { 177 if (queue_.empty()) {
175 *buffer = NULL; 178 *buffer = NULL;
(...skipping 14 matching lines...) Expand all
190 if (current_buf_->BytesRemaining() == 0) { 193 if (current_buf_->BytesRemaining() == 0) {
191 PopQueue(); 194 PopQueue();
192 current_buf_ = NULL; 195 current_buf_ = NULL;
193 } 196 }
194 } 197 }
195 198
196 void BufferedSocketWriter::OnError_Locked(int result) { 199 void BufferedSocketWriter::OnError_Locked(int result) {
197 current_buf_ = NULL; 200 current_buf_ = NULL;
198 } 201 }
199 202
200 BufferedDatagramWriter::BufferedDatagramWriter() { } 203 BufferedDatagramWriter::BufferedDatagramWriter(
204 base::MessageLoopProxy* message_loop)
205 : BufferedSocketWriterBase(message_loop) {
206 }
201 BufferedDatagramWriter::~BufferedDatagramWriter() { } 207 BufferedDatagramWriter::~BufferedDatagramWriter() { }
202 208
203 void BufferedDatagramWriter::GetNextPacket_Locked( 209 void BufferedDatagramWriter::GetNextPacket_Locked(
204 net::IOBuffer** buffer, int* size) { 210 net::IOBuffer** buffer, int* size) {
205 if (queue_.empty()) { 211 if (queue_.empty()) {
206 *buffer = NULL; 212 *buffer = NULL;
207 return; // Nothing to write. 213 return; // Nothing to write.
208 } 214 }
209 *buffer = queue_.front()->data(); 215 *buffer = queue_.front()->data();
210 *size = queue_.front()->data()->size(); 216 *size = queue_.front()->data()->size();
211 } 217 }
212 218
213 void BufferedDatagramWriter::AdvanceBufferPosition_Locked(int written) { 219 void BufferedDatagramWriter::AdvanceBufferPosition_Locked(int written) {
214 DCHECK_EQ(written, queue_.front()->data()->size()); 220 DCHECK_EQ(written, queue_.front()->data()->size());
215 buffer_size_ -= queue_.front()->data()->size(); 221 buffer_size_ -= queue_.front()->data()->size();
216 PopQueue(); 222 PopQueue();
217 } 223 }
218 224
219 void BufferedDatagramWriter::OnError_Locked(int result) { 225 void BufferedDatagramWriter::OnError_Locked(int result) {
220 // Nothing to do here. 226 // Nothing to do here.
221 } 227 }
222 228
223 } // namespace protocol 229 } // namespace protocol
224 } // namespace remoting 230 } // namespace remoting
OLDNEW
« no previous file with comments | « remoting/protocol/buffered_socket_writer.h ('k') | remoting/protocol/client_control_sender.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698