| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "jingle/glue/pseudotcp_adapter.h" | |
| 6 | |
| 7 #include <vector> | |
| 8 | |
| 9 #include "base/bind.h" | |
| 10 #include "base/bind_helpers.h" | |
| 11 #include "base/compiler_specific.h" | |
| 12 #include "jingle/glue/thread_wrapper.h" | |
| 13 #include "net/base/io_buffer.h" | |
| 14 #include "net/base/net_errors.h" | |
| 15 #include "net/base/test_completion_callback.h" | |
| 16 #include "net/udp/udp_socket.h" | |
| 17 #include "testing/gmock/include/gmock/gmock.h" | |
| 18 #include "testing/gtest/include/gtest/gtest.h" | |
| 19 | |
| 20 | |
| 21 namespace jingle_glue { | |
| 22 namespace { | |
| 23 class FakeSocket; | |
| 24 } // namespace | |
| 25 } // namespace jingle_glue | |
| 26 | |
| 27 namespace jingle_glue { | |
| 28 | |
| 29 namespace { | |
| 30 | |
| 31 const int kMessageSize = 1024; | |
| 32 const int kMessages = 100; | |
| 33 const int kTestDataSize = kMessages * kMessageSize; | |
| 34 | |
| 35 class RateLimiter { | |
| 36 public: | |
| 37 virtual ~RateLimiter() { }; | |
| 38 // Returns true if the new packet needs to be dropped, false otherwise. | |
| 39 virtual bool DropNextPacket() = 0; | |
| 40 }; | |
| 41 | |
| 42 class LeakyBucket : public RateLimiter { | |
| 43 public: | |
| 44 // |rate| is in drops per second. | |
| 45 LeakyBucket(double volume, double rate) | |
| 46 : volume_(volume), | |
| 47 rate_(rate), | |
| 48 level_(0.0), | |
| 49 last_update_(base::TimeTicks::Now()) { | |
| 50 } | |
| 51 | |
| 52 ~LeakyBucket() override {} | |
| 53 | |
| 54 bool DropNextPacket() override { | |
| 55 base::TimeTicks now = base::TimeTicks::Now(); | |
| 56 double interval = (now - last_update_).InSecondsF(); | |
| 57 last_update_ = now; | |
| 58 level_ = level_ + 1.0 - interval * rate_; | |
| 59 if (level_ > volume_) { | |
| 60 level_ = volume_; | |
| 61 return true; | |
| 62 } else if (level_ < 0.0) { | |
| 63 level_ = 0.0; | |
| 64 } | |
| 65 return false; | |
| 66 } | |
| 67 | |
| 68 private: | |
| 69 double volume_; | |
| 70 double rate_; | |
| 71 double level_; | |
| 72 base::TimeTicks last_update_; | |
| 73 }; | |
| 74 | |
| 75 class FakeSocket : public net::Socket { | |
| 76 public: | |
| 77 FakeSocket() | |
| 78 : rate_limiter_(NULL), | |
| 79 latency_ms_(0) { | |
| 80 } | |
| 81 ~FakeSocket() override {} | |
| 82 | |
| 83 void AppendInputPacket(const std::vector<char>& data) { | |
| 84 if (rate_limiter_ && rate_limiter_->DropNextPacket()) | |
| 85 return; // Lose the packet. | |
| 86 | |
| 87 if (!read_callback_.is_null()) { | |
| 88 int size = std::min(read_buffer_size_, static_cast<int>(data.size())); | |
| 89 memcpy(read_buffer_->data(), &data[0], data.size()); | |
| 90 net::CompletionCallback cb = read_callback_; | |
| 91 read_callback_.Reset(); | |
| 92 read_buffer_ = NULL; | |
| 93 cb.Run(size); | |
| 94 } else { | |
| 95 incoming_packets_.push_back(data); | |
| 96 } | |
| 97 } | |
| 98 | |
| 99 void Connect(FakeSocket* peer_socket) { | |
| 100 peer_socket_ = peer_socket; | |
| 101 } | |
| 102 | |
| 103 void set_rate_limiter(RateLimiter* rate_limiter) { | |
| 104 rate_limiter_ = rate_limiter; | |
| 105 }; | |
| 106 | |
| 107 void set_latency(int latency_ms) { latency_ms_ = latency_ms; }; | |
| 108 | |
| 109 // net::Socket interface. | |
| 110 int Read(net::IOBuffer* buf, | |
| 111 int buf_len, | |
| 112 const net::CompletionCallback& callback) override { | |
| 113 CHECK(read_callback_.is_null()); | |
| 114 CHECK(buf); | |
| 115 | |
| 116 if (incoming_packets_.size() > 0) { | |
| 117 scoped_refptr<net::IOBuffer> buffer(buf); | |
| 118 int size = std::min( | |
| 119 static_cast<int>(incoming_packets_.front().size()), buf_len); | |
| 120 memcpy(buffer->data(), &*incoming_packets_.front().begin(), size); | |
| 121 incoming_packets_.pop_front(); | |
| 122 return size; | |
| 123 } else { | |
| 124 read_callback_ = callback; | |
| 125 read_buffer_ = buf; | |
| 126 read_buffer_size_ = buf_len; | |
| 127 return net::ERR_IO_PENDING; | |
| 128 } | |
| 129 } | |
| 130 | |
| 131 int Write(net::IOBuffer* buf, | |
| 132 int buf_len, | |
| 133 const net::CompletionCallback& callback) override { | |
| 134 DCHECK(buf); | |
| 135 if (peer_socket_) { | |
| 136 base::MessageLoop::current()->PostDelayedTask( | |
| 137 FROM_HERE, | |
| 138 base::Bind(&FakeSocket::AppendInputPacket, | |
| 139 base::Unretained(peer_socket_), | |
| 140 std::vector<char>(buf->data(), buf->data() + buf_len)), | |
| 141 base::TimeDelta::FromMilliseconds(latency_ms_)); | |
| 142 } | |
| 143 | |
| 144 return buf_len; | |
| 145 } | |
| 146 | |
| 147 int SetReceiveBufferSize(int32 size) override { | |
| 148 NOTIMPLEMENTED(); | |
| 149 return net::ERR_NOT_IMPLEMENTED; | |
| 150 } | |
| 151 int SetSendBufferSize(int32 size) override { | |
| 152 NOTIMPLEMENTED(); | |
| 153 return net::ERR_NOT_IMPLEMENTED; | |
| 154 } | |
| 155 | |
| 156 private: | |
| 157 scoped_refptr<net::IOBuffer> read_buffer_; | |
| 158 int read_buffer_size_; | |
| 159 net::CompletionCallback read_callback_; | |
| 160 | |
| 161 std::deque<std::vector<char> > incoming_packets_; | |
| 162 | |
| 163 FakeSocket* peer_socket_; | |
| 164 RateLimiter* rate_limiter_; | |
| 165 int latency_ms_; | |
| 166 }; | |
| 167 | |
| 168 class TCPChannelTester : public base::RefCountedThreadSafe<TCPChannelTester> { | |
| 169 public: | |
| 170 TCPChannelTester(base::MessageLoop* message_loop, | |
| 171 net::Socket* client_socket, | |
| 172 net::Socket* host_socket) | |
| 173 : message_loop_(message_loop), | |
| 174 host_socket_(host_socket), | |
| 175 client_socket_(client_socket), | |
| 176 done_(false), | |
| 177 write_errors_(0), | |
| 178 read_errors_(0) {} | |
| 179 | |
| 180 void Start() { | |
| 181 message_loop_->PostTask( | |
| 182 FROM_HERE, base::Bind(&TCPChannelTester::DoStart, this)); | |
| 183 } | |
| 184 | |
| 185 void CheckResults() { | |
| 186 EXPECT_EQ(0, write_errors_); | |
| 187 EXPECT_EQ(0, read_errors_); | |
| 188 | |
| 189 ASSERT_EQ(kTestDataSize + kMessageSize, input_buffer_->capacity()); | |
| 190 | |
| 191 output_buffer_->SetOffset(0); | |
| 192 ASSERT_EQ(kTestDataSize, output_buffer_->size()); | |
| 193 | |
| 194 EXPECT_EQ(0, memcmp(output_buffer_->data(), | |
| 195 input_buffer_->StartOfBuffer(), kTestDataSize)); | |
| 196 } | |
| 197 | |
| 198 protected: | |
| 199 virtual ~TCPChannelTester() {} | |
| 200 | |
| 201 void Done() { | |
| 202 done_ = true; | |
| 203 message_loop_->PostTask(FROM_HERE, base::MessageLoop::QuitClosure()); | |
| 204 } | |
| 205 | |
| 206 void DoStart() { | |
| 207 InitBuffers(); | |
| 208 DoRead(); | |
| 209 DoWrite(); | |
| 210 } | |
| 211 | |
| 212 void InitBuffers() { | |
| 213 output_buffer_ = new net::DrainableIOBuffer( | |
| 214 new net::IOBuffer(kTestDataSize), kTestDataSize); | |
| 215 memset(output_buffer_->data(), 123, kTestDataSize); | |
| 216 | |
| 217 input_buffer_ = new net::GrowableIOBuffer(); | |
| 218 // Always keep kMessageSize bytes available at the end of the input buffer. | |
| 219 input_buffer_->SetCapacity(kMessageSize); | |
| 220 } | |
| 221 | |
| 222 void DoWrite() { | |
| 223 int result = 1; | |
| 224 while (result > 0) { | |
| 225 if (output_buffer_->BytesRemaining() == 0) | |
| 226 break; | |
| 227 | |
| 228 int bytes_to_write = std::min(output_buffer_->BytesRemaining(), | |
| 229 kMessageSize); | |
| 230 result = client_socket_->Write( | |
| 231 output_buffer_.get(), | |
| 232 bytes_to_write, | |
| 233 base::Bind(&TCPChannelTester::OnWritten, base::Unretained(this))); | |
| 234 HandleWriteResult(result); | |
| 235 } | |
| 236 } | |
| 237 | |
| 238 void OnWritten(int result) { | |
| 239 HandleWriteResult(result); | |
| 240 DoWrite(); | |
| 241 } | |
| 242 | |
| 243 void HandleWriteResult(int result) { | |
| 244 if (result <= 0 && result != net::ERR_IO_PENDING) { | |
| 245 LOG(ERROR) << "Received error " << result << " when trying to write"; | |
| 246 write_errors_++; | |
| 247 Done(); | |
| 248 } else if (result > 0) { | |
| 249 output_buffer_->DidConsume(result); | |
| 250 } | |
| 251 } | |
| 252 | |
| 253 void DoRead() { | |
| 254 int result = 1; | |
| 255 while (result > 0) { | |
| 256 input_buffer_->set_offset(input_buffer_->capacity() - kMessageSize); | |
| 257 | |
| 258 result = host_socket_->Read( | |
| 259 input_buffer_.get(), | |
| 260 kMessageSize, | |
| 261 base::Bind(&TCPChannelTester::OnRead, base::Unretained(this))); | |
| 262 HandleReadResult(result); | |
| 263 }; | |
| 264 } | |
| 265 | |
| 266 void OnRead(int result) { | |
| 267 HandleReadResult(result); | |
| 268 DoRead(); | |
| 269 } | |
| 270 | |
| 271 void HandleReadResult(int result) { | |
| 272 if (result <= 0 && result != net::ERR_IO_PENDING) { | |
| 273 if (!done_) { | |
| 274 LOG(ERROR) << "Received error " << result << " when trying to read"; | |
| 275 read_errors_++; | |
| 276 Done(); | |
| 277 } | |
| 278 } else if (result > 0) { | |
| 279 // Allocate memory for the next read. | |
| 280 input_buffer_->SetCapacity(input_buffer_->capacity() + result); | |
| 281 if (input_buffer_->capacity() == kTestDataSize + kMessageSize) | |
| 282 Done(); | |
| 283 } | |
| 284 } | |
| 285 | |
| 286 private: | |
| 287 friend class base::RefCountedThreadSafe<TCPChannelTester>; | |
| 288 | |
| 289 base::MessageLoop* message_loop_; | |
| 290 net::Socket* host_socket_; | |
| 291 net::Socket* client_socket_; | |
| 292 bool done_; | |
| 293 | |
| 294 scoped_refptr<net::DrainableIOBuffer> output_buffer_; | |
| 295 scoped_refptr<net::GrowableIOBuffer> input_buffer_; | |
| 296 | |
| 297 int write_errors_; | |
| 298 int read_errors_; | |
| 299 }; | |
| 300 | |
| 301 class PseudoTcpAdapterTest : public testing::Test { | |
| 302 protected: | |
| 303 void SetUp() override { | |
| 304 JingleThreadWrapper::EnsureForCurrentMessageLoop(); | |
| 305 | |
| 306 host_socket_ = new FakeSocket(); | |
| 307 client_socket_ = new FakeSocket(); | |
| 308 | |
| 309 host_socket_->Connect(client_socket_); | |
| 310 client_socket_->Connect(host_socket_); | |
| 311 | |
| 312 host_pseudotcp_.reset(new PseudoTcpAdapter(host_socket_)); | |
| 313 client_pseudotcp_.reset(new PseudoTcpAdapter(client_socket_)); | |
| 314 } | |
| 315 | |
| 316 FakeSocket* host_socket_; | |
| 317 FakeSocket* client_socket_; | |
| 318 | |
| 319 scoped_ptr<PseudoTcpAdapter> host_pseudotcp_; | |
| 320 scoped_ptr<PseudoTcpAdapter> client_pseudotcp_; | |
| 321 base::MessageLoop message_loop_; | |
| 322 }; | |
| 323 | |
| 324 TEST_F(PseudoTcpAdapterTest, DataTransfer) { | |
| 325 net::TestCompletionCallback host_connect_cb; | |
| 326 net::TestCompletionCallback client_connect_cb; | |
| 327 | |
| 328 int rv1 = host_pseudotcp_->Connect(host_connect_cb.callback()); | |
| 329 int rv2 = client_pseudotcp_->Connect(client_connect_cb.callback()); | |
| 330 | |
| 331 if (rv1 == net::ERR_IO_PENDING) | |
| 332 rv1 = host_connect_cb.WaitForResult(); | |
| 333 if (rv2 == net::ERR_IO_PENDING) | |
| 334 rv2 = client_connect_cb.WaitForResult(); | |
| 335 ASSERT_EQ(net::OK, rv1); | |
| 336 ASSERT_EQ(net::OK, rv2); | |
| 337 | |
| 338 scoped_refptr<TCPChannelTester> tester = | |
| 339 new TCPChannelTester(&message_loop_, host_pseudotcp_.get(), | |
| 340 client_pseudotcp_.get()); | |
| 341 | |
| 342 tester->Start(); | |
| 343 message_loop_.Run(); | |
| 344 tester->CheckResults(); | |
| 345 } | |
| 346 | |
| 347 TEST_F(PseudoTcpAdapterTest, LimitedChannel) { | |
| 348 const int kLatencyMs = 20; | |
| 349 const int kPacketsPerSecond = 400; | |
| 350 const int kBurstPackets = 10; | |
| 351 | |
| 352 LeakyBucket host_limiter(kBurstPackets, kPacketsPerSecond); | |
| 353 host_socket_->set_latency(kLatencyMs); | |
| 354 host_socket_->set_rate_limiter(&host_limiter); | |
| 355 | |
| 356 LeakyBucket client_limiter(kBurstPackets, kPacketsPerSecond); | |
| 357 host_socket_->set_latency(kLatencyMs); | |
| 358 client_socket_->set_rate_limiter(&client_limiter); | |
| 359 | |
| 360 net::TestCompletionCallback host_connect_cb; | |
| 361 net::TestCompletionCallback client_connect_cb; | |
| 362 | |
| 363 int rv1 = host_pseudotcp_->Connect(host_connect_cb.callback()); | |
| 364 int rv2 = client_pseudotcp_->Connect(client_connect_cb.callback()); | |
| 365 | |
| 366 if (rv1 == net::ERR_IO_PENDING) | |
| 367 rv1 = host_connect_cb.WaitForResult(); | |
| 368 if (rv2 == net::ERR_IO_PENDING) | |
| 369 rv2 = client_connect_cb.WaitForResult(); | |
| 370 ASSERT_EQ(net::OK, rv1); | |
| 371 ASSERT_EQ(net::OK, rv2); | |
| 372 | |
| 373 scoped_refptr<TCPChannelTester> tester = | |
| 374 new TCPChannelTester(&message_loop_, host_pseudotcp_.get(), | |
| 375 client_pseudotcp_.get()); | |
| 376 | |
| 377 tester->Start(); | |
| 378 message_loop_.Run(); | |
| 379 tester->CheckResults(); | |
| 380 } | |
| 381 | |
| 382 class DeleteOnConnected { | |
| 383 public: | |
| 384 DeleteOnConnected(base::MessageLoop* message_loop, | |
| 385 scoped_ptr<PseudoTcpAdapter>* adapter) | |
| 386 : message_loop_(message_loop), adapter_(adapter) {} | |
| 387 void OnConnected(int error) { | |
| 388 adapter_->reset(); | |
| 389 message_loop_->PostTask(FROM_HERE, base::MessageLoop::QuitClosure()); | |
| 390 } | |
| 391 base::MessageLoop* message_loop_; | |
| 392 scoped_ptr<PseudoTcpAdapter>* adapter_; | |
| 393 }; | |
| 394 | |
| 395 TEST_F(PseudoTcpAdapterTest, DeleteOnConnected) { | |
| 396 // This test verifies that deleting the adapter mid-callback doesn't lead | |
| 397 // to deleted structures being touched as the stack unrolls, so the failure | |
| 398 // mode is a crash rather than a normal test failure. | |
| 399 net::TestCompletionCallback client_connect_cb; | |
| 400 DeleteOnConnected host_delete(&message_loop_, &host_pseudotcp_); | |
| 401 | |
| 402 host_pseudotcp_->Connect(base::Bind(&DeleteOnConnected::OnConnected, | |
| 403 base::Unretained(&host_delete))); | |
| 404 client_pseudotcp_->Connect(client_connect_cb.callback()); | |
| 405 message_loop_.Run(); | |
| 406 | |
| 407 ASSERT_EQ(NULL, host_pseudotcp_.get()); | |
| 408 } | |
| 409 | |
| 410 // Verify that we can send/receive data with the write-waits-for-send | |
| 411 // flag set. | |
| 412 TEST_F(PseudoTcpAdapterTest, WriteWaitsForSendLetsDataThrough) { | |
| 413 net::TestCompletionCallback host_connect_cb; | |
| 414 net::TestCompletionCallback client_connect_cb; | |
| 415 | |
| 416 host_pseudotcp_->SetWriteWaitsForSend(true); | |
| 417 client_pseudotcp_->SetWriteWaitsForSend(true); | |
| 418 | |
| 419 // Disable Nagle's algorithm because the test is slow when it is | |
| 420 // enabled. | |
| 421 host_pseudotcp_->SetNoDelay(true); | |
| 422 | |
| 423 int rv1 = host_pseudotcp_->Connect(host_connect_cb.callback()); | |
| 424 int rv2 = client_pseudotcp_->Connect(client_connect_cb.callback()); | |
| 425 | |
| 426 if (rv1 == net::ERR_IO_PENDING) | |
| 427 rv1 = host_connect_cb.WaitForResult(); | |
| 428 if (rv2 == net::ERR_IO_PENDING) | |
| 429 rv2 = client_connect_cb.WaitForResult(); | |
| 430 ASSERT_EQ(net::OK, rv1); | |
| 431 ASSERT_EQ(net::OK, rv2); | |
| 432 | |
| 433 scoped_refptr<TCPChannelTester> tester = | |
| 434 new TCPChannelTester(&message_loop_, host_pseudotcp_.get(), | |
| 435 client_pseudotcp_.get()); | |
| 436 | |
| 437 tester->Start(); | |
| 438 message_loop_.Run(); | |
| 439 tester->CheckResults(); | |
| 440 } | |
| 441 | |
| 442 } // namespace | |
| 443 | |
| 444 } // namespace jingle_glue | |
| OLD | NEW |