Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(75)

Side by Side Diff: remoting/protocol/buffered_socket_writer.cc

Issue 10836030: Add unittests for BufferedSocketWriter and fix some bugs in that code. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 8 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/buffered_socket_writer.h" 5 #include "remoting/protocol/buffered_socket_writer.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/location.h" 8 #include "base/location.h"
9 #include "base/single_thread_task_runner.h"
9 #include "base/stl_util.h" 10 #include "base/stl_util.h"
11 #include "base/thread_task_runner_handle.h"
10 #include "net/base/net_errors.h" 12 #include "net/base/net_errors.h"
11 13
12 namespace remoting { 14 namespace remoting {
13 namespace protocol { 15 namespace protocol {
14 16
15 class BufferedSocketWriterBase::PendingPacket { 17 class BufferedSocketWriterBase::PendingPacket {
simonmorris 2012/07/31 20:19:50 Maybe just make this a struct?
Sergey Ulanov 2012/07/31 21:37:53 yes, that was my intent
16 public: 18 public:
17 PendingPacket(scoped_refptr<net::IOBufferWithSize> data, 19 PendingPacket(scoped_refptr<net::IOBufferWithSize> data,
18 const base::Closure& done_task) 20 const base::Closure& done_task)
19 : data_(data), 21 : data(data),
20 done_task_(done_task) { 22 done_task(done_task) {
21 }
22 ~PendingPacket() {
23 if (!done_task_.is_null())
24 done_task_.Run();
25 } 23 }
26 24
27 net::IOBufferWithSize* data() { 25 scoped_refptr<net::IOBufferWithSize> data;
28 return data_; 26 base::Closure done_task;
29 }
30
31 private:
32 scoped_refptr<net::IOBufferWithSize> data_;
33 base::Closure done_task_;
34
35 DISALLOW_COPY_AND_ASSIGN(PendingPacket);
36 }; 27 };
37 28
38 BufferedSocketWriterBase::BufferedSocketWriterBase() 29 BufferedSocketWriterBase::BufferedSocketWriterBase()
39 : buffer_size_(0), 30 : buffer_size_(0),
40 socket_(NULL), 31 socket_(NULL),
41 write_pending_(false), 32 write_pending_(false),
42 closed_(false) { 33 closed_(false),
34 destroyed_flag_(NULL) {
43 } 35 }
44 36
45 void BufferedSocketWriterBase::Init(net::Socket* socket, 37 void BufferedSocketWriterBase::Init(net::Socket* socket,
46 const WriteFailedCallback& callback) { 38 const WriteFailedCallback& callback) {
47 DCHECK(CalledOnValidThread()); 39 DCHECK(CalledOnValidThread());
48 DCHECK(socket); 40 DCHECK(socket);
49 socket_ = socket; 41 socket_ = socket;
50 write_failed_callback_ = callback; 42 write_failed_callback_ = callback;
51 } 43 }
52 44
53 bool BufferedSocketWriterBase::Write( 45 bool BufferedSocketWriterBase::Write(
54 scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) { 46 scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) {
55 DCHECK(CalledOnValidThread()); 47 DCHECK(CalledOnValidThread());
56 DCHECK(socket_); 48 DCHECK(socket_);
49 DCHECK(data.get());
57 50
58 // Don't write after Close(). 51 // Don't write after Close().
59 if (closed_) 52 if (closed_)
60 return false; 53 return false;
61 54
62 queue_.push_back(new PendingPacket(data, done_task)); 55 queue_.push_back(new PendingPacket(data, done_task));
63 buffer_size_ += data->size(); 56 buffer_size_ += data->size();
64 57
65 DoWrite(); 58 DoWrite();
66 return true; 59 return true;
(...skipping 17 matching lines...) Expand all
84 GetNextPacket(&current_packet, &current_packet_size); 77 GetNextPacket(&current_packet, &current_packet_size);
85 78
86 // Return if the queue is empty. 79 // Return if the queue is empty.
87 if (!current_packet) 80 if (!current_packet)
88 return; 81 return;
89 82
90 int result = socket_->Write( 83 int result = socket_->Write(
91 current_packet, current_packet_size, 84 current_packet, current_packet_size,
92 base::Bind(&BufferedSocketWriterBase::OnWritten, 85 base::Bind(&BufferedSocketWriterBase::OnWritten,
93 base::Unretained(this))); 86 base::Unretained(this)));
94 if (result >= 0) { 87 bool write_again = false;
95 AdvanceBufferPosition(result); 88 HandleWriteResult(result, &write_again);
89 if (!write_again)
90 return;
91 }
92 }
93
94 void BufferedSocketWriterBase::HandleWriteResult(int result,
95 bool* write_again) {
simonmorris 2012/07/31 20:19:50 Can't HandleWriteResult just return write_again?
Sergey Ulanov 2012/07/31 21:37:53 It can by, it's more readable this way because the
simonmorris 2012/07/31 22:20:13 I think this way makes the call sites unnecessaril
96 *write_again = false;
97 if (result < 0) {
98 if (result == net::ERR_IO_PENDING) {
99 write_pending_ = true;
96 } else { 100 } else {
97 if (result == net::ERR_IO_PENDING) { 101 HandleError(result);
98 write_pending_ = true; 102 if (!write_failed_callback_.is_null())
99 } else { 103 write_failed_callback_.Run(result);
100 HandleError(result); 104 }
101 if (!write_failed_callback_.is_null()) 105 return;
102 write_failed_callback_.Run(result); 106 }
103 } 107
108 base::Closure done_task = AdvanceBufferPosition(result);
109 if (!done_task.is_null()) {
110 bool destroyed = false;
111 destroyed_flag_ = &destroyed;
112 done_task.Run();
113 destroyed_flag_ = NULL;
simonmorris 2012/07/31 20:19:50 Isn't this use-after-free if the BufferedSocketWri
Sergey Ulanov 2012/07/31 21:37:53 Good catch. Fixed.
114
115 if (destroyed) {
116 // Stop doing anything if we've been destroyed by the callback.
104 return; 117 return;
105 } 118 }
106 } 119 }
120
121 *write_again = true;
107 } 122 }
108 123
109 void BufferedSocketWriterBase::OnWritten(int result) { 124 void BufferedSocketWriterBase::OnWritten(int result) {
110 DCHECK(CalledOnValidThread()); 125 DCHECK(CalledOnValidThread());
126 DCHECK(write_pending_);
111 write_pending_ = false; 127 write_pending_ = false;
112 128
113 if (result < 0) { 129 bool write_again;
114 HandleError(result); 130 HandleWriteResult(result, &write_again);
115 if (!write_failed_callback_.is_null()) 131 if (write_again)
116 write_failed_callback_.Run(result); 132 DoWrite();
117 return;
118 }
119
120 AdvanceBufferPosition(result);
121
122 DoWrite();
123 } 133 }
124 134
125 void BufferedSocketWriterBase::HandleError(int result) { 135 void BufferedSocketWriterBase::HandleError(int result) {
126 DCHECK(CalledOnValidThread()); 136 DCHECK(CalledOnValidThread());
127 137
128 closed_ = true; 138 closed_ = true;
129 139
130 STLDeleteElements(&queue_); 140 STLDeleteElements(&queue_);
131 141
132 // Notify subclass that an error is received. 142 // Notify subclass that an error is received.
133 OnError(result); 143 OnError(result);
134 } 144 }
135 145
136 int BufferedSocketWriterBase::GetBufferSize() { 146 int BufferedSocketWriterBase::GetBufferSize() {
137 return buffer_size_; 147 return buffer_size_;
138 } 148 }
139 149
140 int BufferedSocketWriterBase::GetBufferChunks() { 150 int BufferedSocketWriterBase::GetBufferChunks() {
141 return queue_.size(); 151 return queue_.size();
142 } 152 }
143 153
144 void BufferedSocketWriterBase::Close() { 154 void BufferedSocketWriterBase::Close() {
145 DCHECK(CalledOnValidThread()); 155 DCHECK(CalledOnValidThread());
146 closed_ = true; 156 closed_ = true;
147 } 157 }
148 158
149 BufferedSocketWriterBase::~BufferedSocketWriterBase() {} 159 BufferedSocketWriterBase::~BufferedSocketWriterBase() {
160 if (destroyed_flag_)
161 *destroyed_flag_ = true;
150 162
151 void BufferedSocketWriterBase::PopQueue() { 163 STLDeleteElements(&queue_);
152 // This also calls |done_task|. 164 }
165
166 base::Closure BufferedSocketWriterBase::PopQueue() {
167 base::Closure result = queue_.front()->done_task;
153 delete queue_.front(); 168 delete queue_.front();
154 queue_.pop_front(); 169 queue_.pop_front();
170 return result;
155 } 171 }
156 172
157 BufferedSocketWriter::BufferedSocketWriter() { 173 BufferedSocketWriter::BufferedSocketWriter() {
158 } 174 }
159 175
160 void BufferedSocketWriter::GetNextPacket( 176 void BufferedSocketWriter::GetNextPacket(
161 net::IOBuffer** buffer, int* size) { 177 net::IOBuffer** buffer, int* size) {
162 if (!current_buf_) { 178 if (!current_buf_) {
163 if (queue_.empty()) { 179 if (queue_.empty()) {
164 *buffer = NULL; 180 *buffer = NULL;
165 return; // Nothing to write. 181 return; // Nothing to write.
166 } 182 }
167 current_buf_ = new net::DrainableIOBuffer( 183 current_buf_ = new net::DrainableIOBuffer(
168 queue_.front()->data(), queue_.front()->data()->size()); 184 queue_.front()->data, queue_.front()->data->size());
169 } 185 }
170 186
171 *buffer = current_buf_; 187 *buffer = current_buf_;
172 *size = current_buf_->BytesRemaining(); 188 *size = current_buf_->BytesRemaining();
173 } 189 }
174 190
175 void BufferedSocketWriter::AdvanceBufferPosition(int written) { 191 base::Closure BufferedSocketWriter::AdvanceBufferPosition(int written) {
176 buffer_size_ -= written; 192 buffer_size_ -= written;
177 current_buf_->DidConsume(written); 193 current_buf_->DidConsume(written);
178 194
179 if (current_buf_->BytesRemaining() == 0) { 195 if (current_buf_->BytesRemaining() == 0) {
180 PopQueue();
181 current_buf_ = NULL; 196 current_buf_ = NULL;
197 return PopQueue();
182 } 198 }
199 return base::Closure();
183 } 200 }
184 201
185 void BufferedSocketWriter::OnError(int result) { 202 void BufferedSocketWriter::OnError(int result) {
186 current_buf_ = NULL; 203 current_buf_ = NULL;
187 } 204 }
188 205
189 BufferedSocketWriter::~BufferedSocketWriter() { 206 BufferedSocketWriter::~BufferedSocketWriter() {
190 STLDeleteElements(&queue_);
191 } 207 }
192 208
193 BufferedDatagramWriter::BufferedDatagramWriter() { 209 BufferedDatagramWriter::BufferedDatagramWriter() {
194 } 210 }
195 211
196 void BufferedDatagramWriter::GetNextPacket( 212 void BufferedDatagramWriter::GetNextPacket(
197 net::IOBuffer** buffer, int* size) { 213 net::IOBuffer** buffer, int* size) {
198 if (queue_.empty()) { 214 if (queue_.empty()) {
199 *buffer = NULL; 215 *buffer = NULL;
200 return; // Nothing to write. 216 return; // Nothing to write.
201 } 217 }
202 *buffer = queue_.front()->data(); 218 *buffer = queue_.front()->data;
203 *size = queue_.front()->data()->size(); 219 *size = queue_.front()->data->size();
204 } 220 }
205 221
206 void BufferedDatagramWriter::AdvanceBufferPosition(int written) { 222 base::Closure BufferedDatagramWriter::AdvanceBufferPosition(int written) {
207 DCHECK_EQ(written, queue_.front()->data()->size()); 223 DCHECK_EQ(written, queue_.front()->data->size());
208 buffer_size_ -= queue_.front()->data()->size(); 224 buffer_size_ -= queue_.front()->data->size();
209 PopQueue(); 225 return PopQueue();
210 } 226 }
211 227
212 void BufferedDatagramWriter::OnError(int result) { 228 void BufferedDatagramWriter::OnError(int result) {
213 // Nothing to do here. 229 // Nothing to do here.
214 } 230 }
215 231
216 BufferedDatagramWriter::~BufferedDatagramWriter() {} 232 BufferedDatagramWriter::~BufferedDatagramWriter() {
233 }
217 234
218 } // namespace protocol 235 } // namespace protocol
219 } // namespace remoting 236 } // namespace remoting
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698