OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "jingle/glue/pseudotcp_adapter.h" | |
6 | |
7 #include <vector> | |
8 | |
9 #include "base/bind.h" | |
10 #include "base/bind_helpers.h" | |
11 #include "base/compiler_specific.h" | |
12 #include "jingle/glue/thread_wrapper.h" | |
13 #include "net/base/io_buffer.h" | |
14 #include "net/base/net_errors.h" | |
15 #include "net/base/test_completion_callback.h" | |
16 #include "net/udp/udp_socket.h" | |
17 #include "testing/gmock/include/gmock/gmock.h" | |
18 #include "testing/gtest/include/gtest/gtest.h" | |
19 | |
20 | |
21 namespace jingle_glue { | |
22 namespace { | |
23 class FakeSocket; | |
24 } // namespace | |
25 } // namespace jingle_glue | |
26 | |
27 namespace jingle_glue { | |
28 | |
29 namespace { | |
30 | |
31 const int kMessageSize = 1024; | |
32 const int kMessages = 100; | |
33 const int kTestDataSize = kMessages * kMessageSize; | |
34 | |
35 class RateLimiter { | |
36 public: | |
37 virtual ~RateLimiter() { }; | |
38 // Returns true if the new packet needs to be dropped, false otherwise. | |
39 virtual bool DropNextPacket() = 0; | |
40 }; | |
41 | |
42 class LeakyBucket : public RateLimiter { | |
43 public: | |
44 // |rate| is in drops per second. | |
45 LeakyBucket(double volume, double rate) | |
46 : volume_(volume), | |
47 rate_(rate), | |
48 level_(0.0), | |
49 last_update_(base::TimeTicks::Now()) { | |
50 } | |
51 | |
52 ~LeakyBucket() override {} | |
53 | |
54 bool DropNextPacket() override { | |
55 base::TimeTicks now = base::TimeTicks::Now(); | |
56 double interval = (now - last_update_).InSecondsF(); | |
57 last_update_ = now; | |
58 level_ = level_ + 1.0 - interval * rate_; | |
59 if (level_ > volume_) { | |
60 level_ = volume_; | |
61 return true; | |
62 } else if (level_ < 0.0) { | |
63 level_ = 0.0; | |
64 } | |
65 return false; | |
66 } | |
67 | |
68 private: | |
69 double volume_; | |
70 double rate_; | |
71 double level_; | |
72 base::TimeTicks last_update_; | |
73 }; | |
74 | |
75 class FakeSocket : public net::Socket { | |
76 public: | |
77 FakeSocket() | |
78 : rate_limiter_(NULL), | |
79 latency_ms_(0) { | |
80 } | |
81 ~FakeSocket() override {} | |
82 | |
83 void AppendInputPacket(const std::vector<char>& data) { | |
84 if (rate_limiter_ && rate_limiter_->DropNextPacket()) | |
85 return; // Lose the packet. | |
86 | |
87 if (!read_callback_.is_null()) { | |
88 int size = std::min(read_buffer_size_, static_cast<int>(data.size())); | |
89 memcpy(read_buffer_->data(), &data[0], data.size()); | |
90 net::CompletionCallback cb = read_callback_; | |
91 read_callback_.Reset(); | |
92 read_buffer_ = NULL; | |
93 cb.Run(size); | |
94 } else { | |
95 incoming_packets_.push_back(data); | |
96 } | |
97 } | |
98 | |
99 void Connect(FakeSocket* peer_socket) { | |
100 peer_socket_ = peer_socket; | |
101 } | |
102 | |
103 void set_rate_limiter(RateLimiter* rate_limiter) { | |
104 rate_limiter_ = rate_limiter; | |
105 }; | |
106 | |
107 void set_latency(int latency_ms) { latency_ms_ = latency_ms; }; | |
108 | |
109 // net::Socket interface. | |
110 int Read(net::IOBuffer* buf, | |
111 int buf_len, | |
112 const net::CompletionCallback& callback) override { | |
113 CHECK(read_callback_.is_null()); | |
114 CHECK(buf); | |
115 | |
116 if (incoming_packets_.size() > 0) { | |
117 scoped_refptr<net::IOBuffer> buffer(buf); | |
118 int size = std::min( | |
119 static_cast<int>(incoming_packets_.front().size()), buf_len); | |
120 memcpy(buffer->data(), &*incoming_packets_.front().begin(), size); | |
121 incoming_packets_.pop_front(); | |
122 return size; | |
123 } else { | |
124 read_callback_ = callback; | |
125 read_buffer_ = buf; | |
126 read_buffer_size_ = buf_len; | |
127 return net::ERR_IO_PENDING; | |
128 } | |
129 } | |
130 | |
131 int Write(net::IOBuffer* buf, | |
132 int buf_len, | |
133 const net::CompletionCallback& callback) override { | |
134 DCHECK(buf); | |
135 if (peer_socket_) { | |
136 base::MessageLoop::current()->PostDelayedTask( | |
137 FROM_HERE, | |
138 base::Bind(&FakeSocket::AppendInputPacket, | |
139 base::Unretained(peer_socket_), | |
140 std::vector<char>(buf->data(), buf->data() + buf_len)), | |
141 base::TimeDelta::FromMilliseconds(latency_ms_)); | |
142 } | |
143 | |
144 return buf_len; | |
145 } | |
146 | |
147 int SetReceiveBufferSize(int32 size) override { | |
148 NOTIMPLEMENTED(); | |
149 return net::ERR_NOT_IMPLEMENTED; | |
150 } | |
151 int SetSendBufferSize(int32 size) override { | |
152 NOTIMPLEMENTED(); | |
153 return net::ERR_NOT_IMPLEMENTED; | |
154 } | |
155 | |
156 private: | |
157 scoped_refptr<net::IOBuffer> read_buffer_; | |
158 int read_buffer_size_; | |
159 net::CompletionCallback read_callback_; | |
160 | |
161 std::deque<std::vector<char> > incoming_packets_; | |
162 | |
163 FakeSocket* peer_socket_; | |
164 RateLimiter* rate_limiter_; | |
165 int latency_ms_; | |
166 }; | |
167 | |
168 class TCPChannelTester : public base::RefCountedThreadSafe<TCPChannelTester> { | |
169 public: | |
170 TCPChannelTester(base::MessageLoop* message_loop, | |
171 net::Socket* client_socket, | |
172 net::Socket* host_socket) | |
173 : message_loop_(message_loop), | |
174 host_socket_(host_socket), | |
175 client_socket_(client_socket), | |
176 done_(false), | |
177 write_errors_(0), | |
178 read_errors_(0) {} | |
179 | |
180 void Start() { | |
181 message_loop_->PostTask( | |
182 FROM_HERE, base::Bind(&TCPChannelTester::DoStart, this)); | |
183 } | |
184 | |
185 void CheckResults() { | |
186 EXPECT_EQ(0, write_errors_); | |
187 EXPECT_EQ(0, read_errors_); | |
188 | |
189 ASSERT_EQ(kTestDataSize + kMessageSize, input_buffer_->capacity()); | |
190 | |
191 output_buffer_->SetOffset(0); | |
192 ASSERT_EQ(kTestDataSize, output_buffer_->size()); | |
193 | |
194 EXPECT_EQ(0, memcmp(output_buffer_->data(), | |
195 input_buffer_->StartOfBuffer(), kTestDataSize)); | |
196 } | |
197 | |
198 protected: | |
199 virtual ~TCPChannelTester() {} | |
200 | |
201 void Done() { | |
202 done_ = true; | |
203 message_loop_->PostTask(FROM_HERE, base::MessageLoop::QuitClosure()); | |
204 } | |
205 | |
206 void DoStart() { | |
207 InitBuffers(); | |
208 DoRead(); | |
209 DoWrite(); | |
210 } | |
211 | |
212 void InitBuffers() { | |
213 output_buffer_ = new net::DrainableIOBuffer( | |
214 new net::IOBuffer(kTestDataSize), kTestDataSize); | |
215 memset(output_buffer_->data(), 123, kTestDataSize); | |
216 | |
217 input_buffer_ = new net::GrowableIOBuffer(); | |
218 // Always keep kMessageSize bytes available at the end of the input buffer. | |
219 input_buffer_->SetCapacity(kMessageSize); | |
220 } | |
221 | |
222 void DoWrite() { | |
223 int result = 1; | |
224 while (result > 0) { | |
225 if (output_buffer_->BytesRemaining() == 0) | |
226 break; | |
227 | |
228 int bytes_to_write = std::min(output_buffer_->BytesRemaining(), | |
229 kMessageSize); | |
230 result = client_socket_->Write( | |
231 output_buffer_.get(), | |
232 bytes_to_write, | |
233 base::Bind(&TCPChannelTester::OnWritten, base::Unretained(this))); | |
234 HandleWriteResult(result); | |
235 } | |
236 } | |
237 | |
238 void OnWritten(int result) { | |
239 HandleWriteResult(result); | |
240 DoWrite(); | |
241 } | |
242 | |
243 void HandleWriteResult(int result) { | |
244 if (result <= 0 && result != net::ERR_IO_PENDING) { | |
245 LOG(ERROR) << "Received error " << result << " when trying to write"; | |
246 write_errors_++; | |
247 Done(); | |
248 } else if (result > 0) { | |
249 output_buffer_->DidConsume(result); | |
250 } | |
251 } | |
252 | |
253 void DoRead() { | |
254 int result = 1; | |
255 while (result > 0) { | |
256 input_buffer_->set_offset(input_buffer_->capacity() - kMessageSize); | |
257 | |
258 result = host_socket_->Read( | |
259 input_buffer_.get(), | |
260 kMessageSize, | |
261 base::Bind(&TCPChannelTester::OnRead, base::Unretained(this))); | |
262 HandleReadResult(result); | |
263 }; | |
264 } | |
265 | |
266 void OnRead(int result) { | |
267 HandleReadResult(result); | |
268 DoRead(); | |
269 } | |
270 | |
271 void HandleReadResult(int result) { | |
272 if (result <= 0 && result != net::ERR_IO_PENDING) { | |
273 if (!done_) { | |
274 LOG(ERROR) << "Received error " << result << " when trying to read"; | |
275 read_errors_++; | |
276 Done(); | |
277 } | |
278 } else if (result > 0) { | |
279 // Allocate memory for the next read. | |
280 input_buffer_->SetCapacity(input_buffer_->capacity() + result); | |
281 if (input_buffer_->capacity() == kTestDataSize + kMessageSize) | |
282 Done(); | |
283 } | |
284 } | |
285 | |
286 private: | |
287 friend class base::RefCountedThreadSafe<TCPChannelTester>; | |
288 | |
289 base::MessageLoop* message_loop_; | |
290 net::Socket* host_socket_; | |
291 net::Socket* client_socket_; | |
292 bool done_; | |
293 | |
294 scoped_refptr<net::DrainableIOBuffer> output_buffer_; | |
295 scoped_refptr<net::GrowableIOBuffer> input_buffer_; | |
296 | |
297 int write_errors_; | |
298 int read_errors_; | |
299 }; | |
300 | |
301 class PseudoTcpAdapterTest : public testing::Test { | |
302 protected: | |
303 void SetUp() override { | |
304 JingleThreadWrapper::EnsureForCurrentMessageLoop(); | |
305 | |
306 host_socket_ = new FakeSocket(); | |
307 client_socket_ = new FakeSocket(); | |
308 | |
309 host_socket_->Connect(client_socket_); | |
310 client_socket_->Connect(host_socket_); | |
311 | |
312 host_pseudotcp_.reset(new PseudoTcpAdapter(host_socket_)); | |
313 client_pseudotcp_.reset(new PseudoTcpAdapter(client_socket_)); | |
314 } | |
315 | |
316 FakeSocket* host_socket_; | |
317 FakeSocket* client_socket_; | |
318 | |
319 scoped_ptr<PseudoTcpAdapter> host_pseudotcp_; | |
320 scoped_ptr<PseudoTcpAdapter> client_pseudotcp_; | |
321 base::MessageLoop message_loop_; | |
322 }; | |
323 | |
324 TEST_F(PseudoTcpAdapterTest, DataTransfer) { | |
325 net::TestCompletionCallback host_connect_cb; | |
326 net::TestCompletionCallback client_connect_cb; | |
327 | |
328 int rv1 = host_pseudotcp_->Connect(host_connect_cb.callback()); | |
329 int rv2 = client_pseudotcp_->Connect(client_connect_cb.callback()); | |
330 | |
331 if (rv1 == net::ERR_IO_PENDING) | |
332 rv1 = host_connect_cb.WaitForResult(); | |
333 if (rv2 == net::ERR_IO_PENDING) | |
334 rv2 = client_connect_cb.WaitForResult(); | |
335 ASSERT_EQ(net::OK, rv1); | |
336 ASSERT_EQ(net::OK, rv2); | |
337 | |
338 scoped_refptr<TCPChannelTester> tester = | |
339 new TCPChannelTester(&message_loop_, host_pseudotcp_.get(), | |
340 client_pseudotcp_.get()); | |
341 | |
342 tester->Start(); | |
343 message_loop_.Run(); | |
344 tester->CheckResults(); | |
345 } | |
346 | |
347 TEST_F(PseudoTcpAdapterTest, LimitedChannel) { | |
348 const int kLatencyMs = 20; | |
349 const int kPacketsPerSecond = 400; | |
350 const int kBurstPackets = 10; | |
351 | |
352 LeakyBucket host_limiter(kBurstPackets, kPacketsPerSecond); | |
353 host_socket_->set_latency(kLatencyMs); | |
354 host_socket_->set_rate_limiter(&host_limiter); | |
355 | |
356 LeakyBucket client_limiter(kBurstPackets, kPacketsPerSecond); | |
357 host_socket_->set_latency(kLatencyMs); | |
358 client_socket_->set_rate_limiter(&client_limiter); | |
359 | |
360 net::TestCompletionCallback host_connect_cb; | |
361 net::TestCompletionCallback client_connect_cb; | |
362 | |
363 int rv1 = host_pseudotcp_->Connect(host_connect_cb.callback()); | |
364 int rv2 = client_pseudotcp_->Connect(client_connect_cb.callback()); | |
365 | |
366 if (rv1 == net::ERR_IO_PENDING) | |
367 rv1 = host_connect_cb.WaitForResult(); | |
368 if (rv2 == net::ERR_IO_PENDING) | |
369 rv2 = client_connect_cb.WaitForResult(); | |
370 ASSERT_EQ(net::OK, rv1); | |
371 ASSERT_EQ(net::OK, rv2); | |
372 | |
373 scoped_refptr<TCPChannelTester> tester = | |
374 new TCPChannelTester(&message_loop_, host_pseudotcp_.get(), | |
375 client_pseudotcp_.get()); | |
376 | |
377 tester->Start(); | |
378 message_loop_.Run(); | |
379 tester->CheckResults(); | |
380 } | |
381 | |
382 class DeleteOnConnected { | |
383 public: | |
384 DeleteOnConnected(base::MessageLoop* message_loop, | |
385 scoped_ptr<PseudoTcpAdapter>* adapter) | |
386 : message_loop_(message_loop), adapter_(adapter) {} | |
387 void OnConnected(int error) { | |
388 adapter_->reset(); | |
389 message_loop_->PostTask(FROM_HERE, base::MessageLoop::QuitClosure()); | |
390 } | |
391 base::MessageLoop* message_loop_; | |
392 scoped_ptr<PseudoTcpAdapter>* adapter_; | |
393 }; | |
394 | |
395 TEST_F(PseudoTcpAdapterTest, DeleteOnConnected) { | |
396 // This test verifies that deleting the adapter mid-callback doesn't lead | |
397 // to deleted structures being touched as the stack unrolls, so the failure | |
398 // mode is a crash rather than a normal test failure. | |
399 net::TestCompletionCallback client_connect_cb; | |
400 DeleteOnConnected host_delete(&message_loop_, &host_pseudotcp_); | |
401 | |
402 host_pseudotcp_->Connect(base::Bind(&DeleteOnConnected::OnConnected, | |
403 base::Unretained(&host_delete))); | |
404 client_pseudotcp_->Connect(client_connect_cb.callback()); | |
405 message_loop_.Run(); | |
406 | |
407 ASSERT_EQ(NULL, host_pseudotcp_.get()); | |
408 } | |
409 | |
410 // Verify that we can send/receive data with the write-waits-for-send | |
411 // flag set. | |
412 TEST_F(PseudoTcpAdapterTest, WriteWaitsForSendLetsDataThrough) { | |
413 net::TestCompletionCallback host_connect_cb; | |
414 net::TestCompletionCallback client_connect_cb; | |
415 | |
416 host_pseudotcp_->SetWriteWaitsForSend(true); | |
417 client_pseudotcp_->SetWriteWaitsForSend(true); | |
418 | |
419 // Disable Nagle's algorithm because the test is slow when it is | |
420 // enabled. | |
421 host_pseudotcp_->SetNoDelay(true); | |
422 | |
423 int rv1 = host_pseudotcp_->Connect(host_connect_cb.callback()); | |
424 int rv2 = client_pseudotcp_->Connect(client_connect_cb.callback()); | |
425 | |
426 if (rv1 == net::ERR_IO_PENDING) | |
427 rv1 = host_connect_cb.WaitForResult(); | |
428 if (rv2 == net::ERR_IO_PENDING) | |
429 rv2 = client_connect_cb.WaitForResult(); | |
430 ASSERT_EQ(net::OK, rv1); | |
431 ASSERT_EQ(net::OK, rv2); | |
432 | |
433 scoped_refptr<TCPChannelTester> tester = | |
434 new TCPChannelTester(&message_loop_, host_pseudotcp_.get(), | |
435 client_pseudotcp_.get()); | |
436 | |
437 tester->Start(); | |
438 message_loop_.Run(); | |
439 tester->CheckResults(); | |
440 } | |
441 | |
442 } // namespace | |
443 | |
444 } // namespace jingle_glue | |
OLD | NEW |