OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
| 5 #include "base/logging.h" |
5 #include "base/macros.h" | 6 #include "base/macros.h" |
6 #include "base/memory/linked_ptr.h" | 7 #include "base/memory/linked_ptr.h" |
7 #include "base/memory/ref_counted.h" | 8 #include "base/memory/ref_counted.h" |
8 #include "base/memory/scoped_ptr.h" | 9 #include "base/memory/scoped_ptr.h" |
9 #include "base/run_loop.h" | 10 #include "base/run_loop.h" |
10 #include "base/strings/string_util.h" | 11 #include "base/strings/string_util.h" |
| 12 #include "base/strings/stringprintf.h" |
11 #include "mojo/application/public/cpp/application_connection.h" | 13 #include "mojo/application/public/cpp/application_connection.h" |
12 #include "mojo/application/public/cpp/application_impl.h" | 14 #include "mojo/application/public/cpp/application_impl.h" |
13 #include "mojo/application/public/cpp/application_test_base.h" | 15 #include "mojo/application/public/cpp/application_test_base.h" |
14 #include "mojo/common/data_pipe_utils.h" | 16 #include "mojo/common/data_pipe_utils.h" |
15 #include "mojo/services/network/net_address_type_converters.h" | 17 #include "mojo/services/network/net_address_type_converters.h" |
| 18 #include "mojo/services/network/public/cpp/web_socket_read_queue.h" |
| 19 #include "mojo/services/network/public/cpp/web_socket_write_queue.h" |
| 20 #include "mojo/services/network/public/interfaces/http_connection.mojom.h" |
| 21 #include "mojo/services/network/public/interfaces/http_message.mojom.h" |
16 #include "mojo/services/network/public/interfaces/http_server.mojom.h" | 22 #include "mojo/services/network/public/interfaces/http_server.mojom.h" |
| 23 #include "mojo/services/network/public/interfaces/net_address.mojom.h" |
17 #include "mojo/services/network/public/interfaces/network_service.mojom.h" | 24 #include "mojo/services/network/public/interfaces/network_service.mojom.h" |
| 25 #include "mojo/services/network/public/interfaces/web_socket.mojom.h" |
18 #include "net/base/io_buffer.h" | 26 #include "net/base/io_buffer.h" |
19 #include "net/base/net_errors.h" | 27 #include "net/base/net_errors.h" |
20 #include "net/base/test_completion_callback.h" | 28 #include "net/base/test_completion_callback.h" |
21 #include "net/http/http_response_headers.h" | 29 #include "net/http/http_response_headers.h" |
22 #include "net/http/http_util.h" | 30 #include "net/http/http_util.h" |
23 #include "net/socket/tcp_client_socket.h" | 31 #include "net/socket/tcp_client_socket.h" |
24 #include "testing/gtest/include/gtest/gtest.h" | 32 #include "testing/gtest/include/gtest/gtest.h" |
25 | 33 |
26 namespace mojo { | 34 namespace mojo { |
27 namespace { | 35 namespace { |
(...skipping 226 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
254 } | 262 } |
255 | 263 |
256 scoped_refptr<net::IOBufferWithSize> read_buffer_; | 264 scoped_refptr<net::IOBufferWithSize> read_buffer_; |
257 scoped_refptr<net::DrainableIOBuffer> write_buffer_; | 265 scoped_refptr<net::DrainableIOBuffer> write_buffer_; |
258 scoped_ptr<net::TCPClientSocket> socket_; | 266 scoped_ptr<net::TCPClientSocket> socket_; |
259 int connect_result_; | 267 int connect_result_; |
260 | 268 |
261 DISALLOW_COPY_AND_ASSIGN(TestHttpClient); | 269 DISALLOW_COPY_AND_ASSIGN(TestHttpClient); |
262 }; | 270 }; |
263 | 271 |
| 272 class WebSocketClientImpl : public WebSocketClient { |
| 273 public: |
| 274 explicit WebSocketClientImpl() |
| 275 : binding_(this, &client_ptr_), |
| 276 wait_for_message_count_(0), |
| 277 run_loop_(nullptr) {} |
| 278 ~WebSocketClientImpl() override {} |
| 279 |
| 280 // Establishes a connection from the client side. |
| 281 void Connect(WebSocketPtr web_socket, const std::string& url) { |
| 282 web_socket_ = web_socket.Pass(); |
| 283 |
| 284 DataPipe data_pipe; |
| 285 send_stream_ = data_pipe.producer_handle.Pass(); |
| 286 write_send_stream_.reset(new WebSocketWriteQueue(send_stream_.get())); |
| 287 |
| 288 web_socket_->Connect(url, Array<String>(0), "http://example.com", |
| 289 data_pipe.consumer_handle.Pass(), client_ptr_.Pass()); |
| 290 } |
| 291 |
| 292 // Establishes a connection from the server side. |
| 293 void AcceptConnectRequest( |
| 294 const HttpConnectionDelegate::OnReceivedWebSocketRequestCallback& |
| 295 callback) { |
| 296 InterfaceRequest<WebSocket> web_socket_request = GetProxy(&web_socket_); |
| 297 |
| 298 DataPipe data_pipe; |
| 299 send_stream_ = data_pipe.producer_handle.Pass(); |
| 300 write_send_stream_.reset(new WebSocketWriteQueue(send_stream_.get())); |
| 301 |
| 302 callback.Run(web_socket_request.Pass(), data_pipe.consumer_handle.Pass(), |
| 303 client_ptr_.Pass()); |
| 304 } |
| 305 |
| 306 void WaitForConnectCompletion() { |
| 307 DCHECK(!run_loop_); |
| 308 |
| 309 if (receive_stream_.is_valid()) |
| 310 return; |
| 311 |
| 312 base::RunLoop run_loop; |
| 313 run_loop_ = &run_loop; |
| 314 run_loop.Run(); |
| 315 run_loop_ = nullptr; |
| 316 } |
| 317 |
| 318 void Send(const std::string& message) { |
| 319 DCHECK(!message.empty()); |
| 320 |
| 321 uint32_t size = static_cast<uint32_t>(message.size()); |
| 322 write_send_stream_->Write( |
| 323 &message[0], size, |
| 324 base::Bind(&WebSocketClientImpl::OnFinishedWritingSendStream, |
| 325 base::Unretained(this), size)); |
| 326 } |
| 327 |
| 328 void WaitForMessage(size_t count) { |
| 329 DCHECK(!run_loop_); |
| 330 |
| 331 if (received_messages_.size() >= count) |
| 332 return; |
| 333 wait_for_message_count_ = count; |
| 334 base::RunLoop run_loop; |
| 335 run_loop_ = &run_loop; |
| 336 run_loop.Run(); |
| 337 run_loop_ = nullptr; |
| 338 } |
| 339 |
| 340 std::vector<std::string>& received_messages() { return received_messages_; } |
| 341 |
| 342 private: |
| 343 // WebSocketClient implementation. |
| 344 void DidConnect(const String& selected_subprotocol, |
| 345 const String& extensions, |
| 346 ScopedDataPipeConsumerHandle receive_stream) override { |
| 347 receive_stream_ = receive_stream.Pass(); |
| 348 read_receive_stream_.reset(new WebSocketReadQueue(receive_stream_.get())); |
| 349 |
| 350 web_socket_->FlowControl(2048); |
| 351 if (run_loop_) |
| 352 run_loop_->Quit(); |
| 353 } |
| 354 |
| 355 void DidReceiveData(bool fin, |
| 356 WebSocket::MessageType type, |
| 357 uint32_t num_bytes) override { |
| 358 DCHECK(num_bytes > 0); |
| 359 |
| 360 read_receive_stream_->Read( |
| 361 num_bytes, |
| 362 base::Bind(&WebSocketClientImpl::OnFinishedReadingReceiveStream, |
| 363 base::Unretained(this), num_bytes)); |
| 364 } |
| 365 |
| 366 void DidReceiveFlowControl(int64_t quota) override {} |
| 367 |
| 368 void DidFail(const String& message) override {} |
| 369 |
| 370 void DidClose(bool was_clean, uint16_t code, const String& reason) override {} |
| 371 |
| 372 void OnFinishedWritingSendStream(uint32_t num_bytes, const char* buffer) { |
| 373 EXPECT_TRUE(buffer); |
| 374 |
| 375 web_socket_->Send(true, WebSocket::MESSAGE_TYPE_TEXT, num_bytes); |
| 376 } |
| 377 |
| 378 void OnFinishedReadingReceiveStream(uint32_t num_bytes, const char* data) { |
| 379 EXPECT_TRUE(data); |
| 380 |
| 381 received_messages_.push_back(std::string(data, num_bytes)); |
| 382 if (run_loop_ && received_messages_.size() >= wait_for_message_count_) { |
| 383 wait_for_message_count_ = 0; |
| 384 run_loop_->Quit(); |
| 385 } |
| 386 } |
| 387 |
| 388 WebSocketClientPtr client_ptr_; |
| 389 Binding<WebSocketClient> binding_; |
| 390 WebSocketPtr web_socket_; |
| 391 |
| 392 ScopedDataPipeProducerHandle send_stream_; |
| 393 scoped_ptr<WebSocketWriteQueue> write_send_stream_; |
| 394 |
| 395 ScopedDataPipeConsumerHandle receive_stream_; |
| 396 scoped_ptr<WebSocketReadQueue> read_receive_stream_; |
| 397 |
| 398 std::vector<std::string> received_messages_; |
| 399 size_t wait_for_message_count_; |
| 400 |
| 401 // Pointing to a stack-allocated RunLoop instance. |
| 402 base::RunLoop* run_loop_; |
| 403 |
| 404 DISALLOW_COPY_AND_ASSIGN(WebSocketClientImpl); |
| 405 }; |
| 406 |
264 class HttpConnectionDelegateImpl : public HttpConnectionDelegate { | 407 class HttpConnectionDelegateImpl : public HttpConnectionDelegate { |
265 public: | 408 public: |
266 struct PendingRequest { | 409 struct PendingRequest { |
267 HttpRequestPtr request; | 410 HttpRequestPtr request; |
268 OnReceivedRequestCallback callback; | 411 OnReceivedRequestCallback callback; |
269 }; | 412 }; |
270 | 413 |
271 HttpConnectionDelegateImpl(HttpConnectionPtr connection, | 414 HttpConnectionDelegateImpl(HttpConnectionPtr connection, |
272 InterfaceRequest<HttpConnectionDelegate> request) | 415 InterfaceRequest<HttpConnectionDelegate> request) |
273 : connection_(connection.Pass()), | 416 : connection_(connection.Pass()), |
274 binding_(this, request.Pass()), | 417 binding_(this, request.Pass()), |
275 run_loop_(nullptr), | 418 wait_for_request_count_(0), |
276 wait_for_request_count_(0) {} | 419 run_loop_(nullptr) {} |
277 ~HttpConnectionDelegateImpl() override {} | 420 ~HttpConnectionDelegateImpl() override {} |
278 | 421 |
279 // HttpConnectionDelegate implementation: | 422 // HttpConnectionDelegate implementation: |
280 void OnReceivedRequest(HttpRequestPtr request, | 423 void OnReceivedRequest(HttpRequestPtr request, |
281 const OnReceivedRequestCallback& callback) override { | 424 const OnReceivedRequestCallback& callback) override { |
282 linked_ptr<PendingRequest> pending_request(new PendingRequest); | 425 linked_ptr<PendingRequest> pending_request(new PendingRequest); |
283 pending_request->request = request.Pass(); | 426 pending_request->request = request.Pass(); |
284 pending_request->callback = callback; | 427 pending_request->callback = callback; |
285 pending_requests_.push_back(pending_request); | 428 pending_requests_.push_back(pending_request); |
286 if (run_loop_ && pending_requests_.size() >= wait_for_request_count_) { | 429 if (run_loop_ && pending_requests_.size() >= wait_for_request_count_) { |
287 wait_for_request_count_ = 0; | 430 wait_for_request_count_ = 0; |
288 run_loop_->Quit(); | 431 run_loop_->Quit(); |
289 } | 432 } |
290 } | 433 } |
291 | 434 |
292 void OnReceivedWebSocketRequest( | 435 void OnReceivedWebSocketRequest( |
293 HttpRequestPtr request, | 436 HttpRequestPtr request, |
294 const OnReceivedWebSocketRequestCallback& callback) override { | 437 const OnReceivedWebSocketRequestCallback& callback) override { |
295 NOTREACHED(); | 438 web_socket_.reset(new WebSocketClientImpl()); |
| 439 |
| 440 web_socket_->AcceptConnectRequest(callback); |
| 441 |
| 442 if (run_loop_) |
| 443 run_loop_->Quit(); |
296 } | 444 } |
297 | 445 |
298 void SendResponse(HttpResponsePtr response) { | 446 void SendResponse(HttpResponsePtr response) { |
299 ASSERT_FALSE(pending_requests_.empty()); | 447 ASSERT_FALSE(pending_requests_.empty()); |
300 linked_ptr<PendingRequest> request = pending_requests_[0]; | 448 linked_ptr<PendingRequest> request = pending_requests_[0]; |
301 pending_requests_.erase(pending_requests_.begin()); | 449 pending_requests_.erase(pending_requests_.begin()); |
302 request->callback.Run(response.Pass()); | 450 request->callback.Run(response.Pass()); |
303 } | 451 } |
304 | 452 |
305 void WaitForRequest(size_t count) { | 453 void WaitForRequest(size_t count) { |
306 DCHECK(!run_loop_); | 454 DCHECK(!run_loop_); |
307 | 455 |
| 456 if (pending_requests_.size() >= count) |
| 457 return; |
| 458 |
308 wait_for_request_count_ = count; | 459 wait_for_request_count_ = count; |
309 base::RunLoop run_loop; | 460 base::RunLoop run_loop; |
310 run_loop_ = &run_loop; | 461 run_loop_ = &run_loop; |
311 run_loop.Run(); | 462 run_loop.Run(); |
312 run_loop_ = nullptr; | 463 run_loop_ = nullptr; |
313 } | 464 } |
314 | 465 |
| 466 void WaitForWebSocketRequest() { |
| 467 DCHECK(!run_loop_); |
| 468 |
| 469 if (web_socket_) |
| 470 return; |
| 471 |
| 472 base::RunLoop run_loop; |
| 473 run_loop_ = &run_loop; |
| 474 run_loop.Run(); |
| 475 run_loop_ = nullptr; |
| 476 } |
| 477 |
315 std::vector<linked_ptr<PendingRequest>>& pending_requests() { | 478 std::vector<linked_ptr<PendingRequest>>& pending_requests() { |
316 return pending_requests_; | 479 return pending_requests_; |
317 } | 480 } |
318 | 481 |
| 482 WebSocketClientImpl* web_socket() { return web_socket_.get(); } |
| 483 |
319 private: | 484 private: |
320 HttpConnectionPtr connection_; | 485 HttpConnectionPtr connection_; |
321 Binding<HttpConnectionDelegate> binding_; | 486 Binding<HttpConnectionDelegate> binding_; |
322 std::vector<linked_ptr<PendingRequest>> pending_requests_; | 487 std::vector<linked_ptr<PendingRequest>> pending_requests_; |
| 488 size_t wait_for_request_count_; |
| 489 scoped_ptr<WebSocketClientImpl> web_socket_; |
| 490 |
323 // Pointing to a stack-allocated RunLoop instance. | 491 // Pointing to a stack-allocated RunLoop instance. |
324 base::RunLoop* run_loop_; | 492 base::RunLoop* run_loop_; |
325 size_t wait_for_request_count_; | |
326 | 493 |
327 DISALLOW_COPY_AND_ASSIGN(HttpConnectionDelegateImpl); | 494 DISALLOW_COPY_AND_ASSIGN(HttpConnectionDelegateImpl); |
328 }; | 495 }; |
329 | 496 |
330 class HttpServerDelegateImpl : public HttpServerDelegate { | 497 class HttpServerDelegateImpl : public HttpServerDelegate { |
331 public: | 498 public: |
332 explicit HttpServerDelegateImpl(HttpServerDelegatePtr* delegate_ptr) | 499 explicit HttpServerDelegateImpl(HttpServerDelegatePtr* delegate_ptr) |
333 : binding_(this, delegate_ptr), | 500 : binding_(this, delegate_ptr), |
334 run_loop_(nullptr), | 501 wait_for_connection_count_(0), |
335 wait_for_connection_count_(0) {} | 502 run_loop_(nullptr) {} |
336 ~HttpServerDelegateImpl() override {} | 503 ~HttpServerDelegateImpl() override {} |
337 | 504 |
338 // HttpServerDelegate implementation. | 505 // HttpServerDelegate implementation. |
339 void OnConnected(HttpConnectionPtr connection, | 506 void OnConnected(HttpConnectionPtr connection, |
340 InterfaceRequest<HttpConnectionDelegate> delegate) override { | 507 InterfaceRequest<HttpConnectionDelegate> delegate) override { |
341 connections_.push_back(make_linked_ptr( | 508 connections_.push_back(make_linked_ptr( |
342 new HttpConnectionDelegateImpl(connection.Pass(), delegate.Pass()))); | 509 new HttpConnectionDelegateImpl(connection.Pass(), delegate.Pass()))); |
343 if (run_loop_ && connections_.size() >= wait_for_connection_count_) { | 510 if (run_loop_ && connections_.size() >= wait_for_connection_count_) { |
344 wait_for_connection_count_ = 0; | 511 wait_for_connection_count_ = 0; |
345 run_loop_->Quit(); | 512 run_loop_->Quit(); |
346 } | 513 } |
347 } | 514 } |
348 | 515 |
349 void WaitForConnection(size_t count) { | 516 void WaitForConnection(size_t count) { |
350 DCHECK(!run_loop_); | 517 DCHECK(!run_loop_); |
351 | 518 |
| 519 if (connections_.size() >= count) |
| 520 return; |
| 521 |
352 wait_for_connection_count_ = count; | 522 wait_for_connection_count_ = count; |
353 base::RunLoop run_loop; | 523 base::RunLoop run_loop; |
354 run_loop_ = &run_loop; | 524 run_loop_ = &run_loop; |
355 run_loop.Run(); | 525 run_loop.Run(); |
356 run_loop_ = nullptr; | 526 run_loop_ = nullptr; |
357 } | 527 } |
358 | 528 |
359 std::vector<linked_ptr<HttpConnectionDelegateImpl>>& connections() { | 529 std::vector<linked_ptr<HttpConnectionDelegateImpl>>& connections() { |
360 return connections_; | 530 return connections_; |
361 } | 531 } |
362 | 532 |
363 private: | 533 private: |
364 Binding<HttpServerDelegate> binding_; | 534 Binding<HttpServerDelegate> binding_; |
365 std::vector<linked_ptr<HttpConnectionDelegateImpl>> connections_; | 535 std::vector<linked_ptr<HttpConnectionDelegateImpl>> connections_; |
| 536 size_t wait_for_connection_count_; |
366 // Pointing to a stack-allocated RunLoop instance. | 537 // Pointing to a stack-allocated RunLoop instance. |
367 base::RunLoop* run_loop_; | 538 base::RunLoop* run_loop_; |
368 size_t wait_for_connection_count_; | |
369 | 539 |
370 DISALLOW_COPY_AND_ASSIGN(HttpServerDelegateImpl); | 540 DISALLOW_COPY_AND_ASSIGN(HttpServerDelegateImpl); |
371 }; | 541 }; |
372 | 542 |
373 class HttpServerAppTest : public test::ApplicationTestBase { | 543 class HttpServerAppTest : public test::ApplicationTestBase { |
374 public: | 544 public: |
375 HttpServerAppTest() : message_loop_(base::MessageLoop::TYPE_IO) {} | 545 HttpServerAppTest() : message_loop_(base::MessageLoop::TYPE_IO) {} |
376 ~HttpServerAppTest() override {} | 546 ~HttpServerAppTest() override {} |
377 | 547 |
378 protected: | 548 protected: |
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
471 {{"Content-Length", "26"}}, | 641 {{"Content-Length", "26"}}, |
472 make_scoped_ptr(new std::string("This is a test response..."))}; | 642 make_scoped_ptr(new std::string("This is a test response..."))}; |
473 connection.SendResponse(MakeResponseStruct(response_data)); | 643 connection.SendResponse(MakeResponseStruct(response_data)); |
474 | 644 |
475 std::string response_message; | 645 std::string response_message; |
476 client.ReadResponse(&response_message); | 646 client.ReadResponse(&response_message); |
477 | 647 |
478 CheckResponse(response_data, response_message); | 648 CheckResponse(response_data, response_message); |
479 } | 649 } |
480 | 650 |
| 651 TEST_F(HttpServerAppTest, WebSocket) { |
| 652 NetAddressPtr bound_to; |
| 653 HttpServerDelegatePtr server_delegate_ptr; |
| 654 HttpServerDelegateImpl server_delegate_impl(&server_delegate_ptr); |
| 655 CreateHttpServer(server_delegate_ptr.Pass(), &bound_to); |
| 656 |
| 657 WebSocketPtr web_socket_ptr; |
| 658 network_service_->CreateWebSocket(GetProxy(&web_socket_ptr)); |
| 659 WebSocketClientImpl socket_0; |
| 660 socket_0.Connect( |
| 661 web_socket_ptr.Pass(), |
| 662 base::StringPrintf("ws://127.0.0.1:%d/hello", bound_to->ipv4->port)); |
| 663 |
| 664 server_delegate_impl.WaitForConnection(1); |
| 665 HttpConnectionDelegateImpl& connection = |
| 666 *server_delegate_impl.connections()[0]; |
| 667 |
| 668 connection.WaitForWebSocketRequest(); |
| 669 WebSocketClientImpl& socket_1 = *connection.web_socket(); |
| 670 |
| 671 socket_1.WaitForConnectCompletion(); |
| 672 socket_0.WaitForConnectCompletion(); |
| 673 |
| 674 socket_0.Send("Hello"); |
| 675 socket_0.Send("world!"); |
| 676 |
| 677 socket_1.WaitForMessage(2); |
| 678 EXPECT_EQ("Hello", socket_1.received_messages()[0]); |
| 679 EXPECT_EQ("world!", socket_1.received_messages()[1]); |
| 680 |
| 681 socket_1.Send("How do"); |
| 682 socket_1.Send("you do?"); |
| 683 |
| 684 socket_0.WaitForMessage(2); |
| 685 EXPECT_EQ("How do", socket_0.received_messages()[0]); |
| 686 EXPECT_EQ("you do?", socket_0.received_messages()[1]); |
| 687 } |
| 688 |
481 } // namespace mojo | 689 } // namespace mojo |
OLD | NEW |