Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1040)

Side by Side Diff: mojo/services/network/http_server_apptest.cc

Issue 1144843002: Mojo service implementation for HTTP server - part 3 (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: rebase & resolve Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/logging.h"
5 #include "base/macros.h" 6 #include "base/macros.h"
6 #include "base/memory/linked_ptr.h" 7 #include "base/memory/linked_ptr.h"
7 #include "base/memory/ref_counted.h" 8 #include "base/memory/ref_counted.h"
8 #include "base/memory/scoped_ptr.h" 9 #include "base/memory/scoped_ptr.h"
9 #include "base/run_loop.h" 10 #include "base/run_loop.h"
10 #include "base/strings/string_util.h" 11 #include "base/strings/string_util.h"
12 #include "base/strings/stringprintf.h"
11 #include "mojo/application/public/cpp/application_connection.h" 13 #include "mojo/application/public/cpp/application_connection.h"
12 #include "mojo/application/public/cpp/application_impl.h" 14 #include "mojo/application/public/cpp/application_impl.h"
13 #include "mojo/application/public/cpp/application_test_base.h" 15 #include "mojo/application/public/cpp/application_test_base.h"
14 #include "mojo/common/data_pipe_utils.h" 16 #include "mojo/common/data_pipe_utils.h"
15 #include "mojo/services/network/net_address_type_converters.h" 17 #include "mojo/services/network/net_address_type_converters.h"
18 #include "mojo/services/network/public/cpp/web_socket_read_queue.h"
19 #include "mojo/services/network/public/cpp/web_socket_write_queue.h"
20 #include "mojo/services/network/public/interfaces/http_connection.mojom.h"
21 #include "mojo/services/network/public/interfaces/http_message.mojom.h"
16 #include "mojo/services/network/public/interfaces/http_server.mojom.h" 22 #include "mojo/services/network/public/interfaces/http_server.mojom.h"
23 #include "mojo/services/network/public/interfaces/net_address.mojom.h"
17 #include "mojo/services/network/public/interfaces/network_service.mojom.h" 24 #include "mojo/services/network/public/interfaces/network_service.mojom.h"
25 #include "mojo/services/network/public/interfaces/web_socket.mojom.h"
18 #include "net/base/io_buffer.h" 26 #include "net/base/io_buffer.h"
19 #include "net/base/net_errors.h" 27 #include "net/base/net_errors.h"
20 #include "net/base/test_completion_callback.h" 28 #include "net/base/test_completion_callback.h"
21 #include "net/http/http_response_headers.h" 29 #include "net/http/http_response_headers.h"
22 #include "net/http/http_util.h" 30 #include "net/http/http_util.h"
23 #include "net/socket/tcp_client_socket.h" 31 #include "net/socket/tcp_client_socket.h"
24 #include "testing/gtest/include/gtest/gtest.h" 32 #include "testing/gtest/include/gtest/gtest.h"
25 33
26 namespace mojo { 34 namespace mojo {
27 namespace { 35 namespace {
(...skipping 226 matching lines...) Expand 10 before | Expand all | Expand 10 after
254 } 262 }
255 263
256 scoped_refptr<net::IOBufferWithSize> read_buffer_; 264 scoped_refptr<net::IOBufferWithSize> read_buffer_;
257 scoped_refptr<net::DrainableIOBuffer> write_buffer_; 265 scoped_refptr<net::DrainableIOBuffer> write_buffer_;
258 scoped_ptr<net::TCPClientSocket> socket_; 266 scoped_ptr<net::TCPClientSocket> socket_;
259 int connect_result_; 267 int connect_result_;
260 268
261 DISALLOW_COPY_AND_ASSIGN(TestHttpClient); 269 DISALLOW_COPY_AND_ASSIGN(TestHttpClient);
262 }; 270 };
263 271
272 class WebSocketClientImpl : public WebSocketClient {
273 public:
274 explicit WebSocketClientImpl()
275 : binding_(this, &client_ptr_),
276 wait_for_message_count_(0),
277 run_loop_(nullptr) {}
278 ~WebSocketClientImpl() override {}
279
280 // Establishes a connection from the client side.
281 void Connect(WebSocketPtr web_socket, const std::string& url) {
282 web_socket_ = web_socket.Pass();
283
284 DataPipe data_pipe;
285 send_stream_ = data_pipe.producer_handle.Pass();
286 write_send_stream_.reset(new WebSocketWriteQueue(send_stream_.get()));
287
288 web_socket_->Connect(url, Array<String>(0), "http://example.com",
289 data_pipe.consumer_handle.Pass(), client_ptr_.Pass());
290 }
291
292 // Establishes a connection from the server side.
293 void AcceptConnectRequest(
294 const HttpConnectionDelegate::OnReceivedWebSocketRequestCallback&
295 callback) {
296 InterfaceRequest<WebSocket> web_socket_request = GetProxy(&web_socket_);
297
298 DataPipe data_pipe;
299 send_stream_ = data_pipe.producer_handle.Pass();
300 write_send_stream_.reset(new WebSocketWriteQueue(send_stream_.get()));
301
302 callback.Run(web_socket_request.Pass(), data_pipe.consumer_handle.Pass(),
303 client_ptr_.Pass());
304 }
305
306 void WaitForConnectCompletion() {
307 DCHECK(!run_loop_);
308
309 if (receive_stream_.is_valid())
310 return;
311
312 base::RunLoop run_loop;
313 run_loop_ = &run_loop;
314 run_loop.Run();
315 run_loop_ = nullptr;
316 }
317
318 void Send(const std::string& message) {
319 DCHECK(!message.empty());
320
321 uint32_t size = static_cast<uint32_t>(message.size());
322 write_send_stream_->Write(
323 &message[0], size,
324 base::Bind(&WebSocketClientImpl::OnFinishedWritingSendStream,
325 base::Unretained(this), size));
326 }
327
328 void WaitForMessage(size_t count) {
329 DCHECK(!run_loop_);
330
331 if (received_messages_.size() >= count)
332 return;
333 wait_for_message_count_ = count;
334 base::RunLoop run_loop;
335 run_loop_ = &run_loop;
336 run_loop.Run();
337 run_loop_ = nullptr;
338 }
339
340 std::vector<std::string>& received_messages() { return received_messages_; }
341
342 private:
343 // WebSocketClient implementation.
344 void DidConnect(const String& selected_subprotocol,
345 const String& extensions,
346 ScopedDataPipeConsumerHandle receive_stream) override {
347 receive_stream_ = receive_stream.Pass();
348 read_receive_stream_.reset(new WebSocketReadQueue(receive_stream_.get()));
349
350 web_socket_->FlowControl(2048);
351 if (run_loop_)
352 run_loop_->Quit();
353 }
354
355 void DidReceiveData(bool fin,
356 WebSocket::MessageType type,
357 uint32_t num_bytes) override {
358 DCHECK(num_bytes > 0);
359
360 read_receive_stream_->Read(
361 num_bytes,
362 base::Bind(&WebSocketClientImpl::OnFinishedReadingReceiveStream,
363 base::Unretained(this), num_bytes));
364 }
365
366 void DidReceiveFlowControl(int64_t quota) override {}
367
368 void DidFail(const String& message) override {}
369
370 void DidClose(bool was_clean, uint16_t code, const String& reason) override {}
371
372 void OnFinishedWritingSendStream(uint32_t num_bytes, const char* buffer) {
373 EXPECT_TRUE(buffer);
374
375 web_socket_->Send(true, WebSocket::MESSAGE_TYPE_TEXT, num_bytes);
376 }
377
378 void OnFinishedReadingReceiveStream(uint32_t num_bytes, const char* data) {
379 EXPECT_TRUE(data);
380
381 received_messages_.push_back(std::string(data, num_bytes));
382 if (run_loop_ && received_messages_.size() >= wait_for_message_count_) {
383 wait_for_message_count_ = 0;
384 run_loop_->Quit();
385 }
386 }
387
388 WebSocketClientPtr client_ptr_;
389 Binding<WebSocketClient> binding_;
390 WebSocketPtr web_socket_;
391
392 ScopedDataPipeProducerHandle send_stream_;
393 scoped_ptr<WebSocketWriteQueue> write_send_stream_;
394
395 ScopedDataPipeConsumerHandle receive_stream_;
396 scoped_ptr<WebSocketReadQueue> read_receive_stream_;
397
398 std::vector<std::string> received_messages_;
399 size_t wait_for_message_count_;
400
401 // Pointing to a stack-allocated RunLoop instance.
402 base::RunLoop* run_loop_;
403
404 DISALLOW_COPY_AND_ASSIGN(WebSocketClientImpl);
405 };
406
264 class HttpConnectionDelegateImpl : public HttpConnectionDelegate { 407 class HttpConnectionDelegateImpl : public HttpConnectionDelegate {
265 public: 408 public:
266 struct PendingRequest { 409 struct PendingRequest {
267 HttpRequestPtr request; 410 HttpRequestPtr request;
268 OnReceivedRequestCallback callback; 411 OnReceivedRequestCallback callback;
269 }; 412 };
270 413
271 HttpConnectionDelegateImpl(HttpConnectionPtr connection, 414 HttpConnectionDelegateImpl(HttpConnectionPtr connection,
272 InterfaceRequest<HttpConnectionDelegate> request) 415 InterfaceRequest<HttpConnectionDelegate> request)
273 : connection_(connection.Pass()), 416 : connection_(connection.Pass()),
274 binding_(this, request.Pass()), 417 binding_(this, request.Pass()),
275 run_loop_(nullptr), 418 wait_for_request_count_(0),
276 wait_for_request_count_(0) {} 419 run_loop_(nullptr) {}
277 ~HttpConnectionDelegateImpl() override {} 420 ~HttpConnectionDelegateImpl() override {}
278 421
279 // HttpConnectionDelegate implementation: 422 // HttpConnectionDelegate implementation:
280 void OnReceivedRequest(HttpRequestPtr request, 423 void OnReceivedRequest(HttpRequestPtr request,
281 const OnReceivedRequestCallback& callback) override { 424 const OnReceivedRequestCallback& callback) override {
282 linked_ptr<PendingRequest> pending_request(new PendingRequest); 425 linked_ptr<PendingRequest> pending_request(new PendingRequest);
283 pending_request->request = request.Pass(); 426 pending_request->request = request.Pass();
284 pending_request->callback = callback; 427 pending_request->callback = callback;
285 pending_requests_.push_back(pending_request); 428 pending_requests_.push_back(pending_request);
286 if (run_loop_ && pending_requests_.size() >= wait_for_request_count_) { 429 if (run_loop_ && pending_requests_.size() >= wait_for_request_count_) {
287 wait_for_request_count_ = 0; 430 wait_for_request_count_ = 0;
288 run_loop_->Quit(); 431 run_loop_->Quit();
289 } 432 }
290 } 433 }
291 434
292 void OnReceivedWebSocketRequest( 435 void OnReceivedWebSocketRequest(
293 HttpRequestPtr request, 436 HttpRequestPtr request,
294 const OnReceivedWebSocketRequestCallback& callback) override { 437 const OnReceivedWebSocketRequestCallback& callback) override {
295 NOTREACHED(); 438 web_socket_.reset(new WebSocketClientImpl());
439
440 web_socket_->AcceptConnectRequest(callback);
441
442 if (run_loop_)
443 run_loop_->Quit();
296 } 444 }
297 445
298 void SendResponse(HttpResponsePtr response) { 446 void SendResponse(HttpResponsePtr response) {
299 ASSERT_FALSE(pending_requests_.empty()); 447 ASSERT_FALSE(pending_requests_.empty());
300 linked_ptr<PendingRequest> request = pending_requests_[0]; 448 linked_ptr<PendingRequest> request = pending_requests_[0];
301 pending_requests_.erase(pending_requests_.begin()); 449 pending_requests_.erase(pending_requests_.begin());
302 request->callback.Run(response.Pass()); 450 request->callback.Run(response.Pass());
303 } 451 }
304 452
305 void WaitForRequest(size_t count) { 453 void WaitForRequest(size_t count) {
306 DCHECK(!run_loop_); 454 DCHECK(!run_loop_);
307 455
456 if (pending_requests_.size() >= count)
457 return;
458
308 wait_for_request_count_ = count; 459 wait_for_request_count_ = count;
309 base::RunLoop run_loop; 460 base::RunLoop run_loop;
310 run_loop_ = &run_loop; 461 run_loop_ = &run_loop;
311 run_loop.Run(); 462 run_loop.Run();
312 run_loop_ = nullptr; 463 run_loop_ = nullptr;
313 } 464 }
314 465
466 void WaitForWebSocketRequest() {
467 DCHECK(!run_loop_);
468
469 if (web_socket_)
470 return;
471
472 base::RunLoop run_loop;
473 run_loop_ = &run_loop;
474 run_loop.Run();
475 run_loop_ = nullptr;
476 }
477
315 std::vector<linked_ptr<PendingRequest>>& pending_requests() { 478 std::vector<linked_ptr<PendingRequest>>& pending_requests() {
316 return pending_requests_; 479 return pending_requests_;
317 } 480 }
318 481
482 WebSocketClientImpl* web_socket() { return web_socket_.get(); }
483
319 private: 484 private:
320 HttpConnectionPtr connection_; 485 HttpConnectionPtr connection_;
321 Binding<HttpConnectionDelegate> binding_; 486 Binding<HttpConnectionDelegate> binding_;
322 std::vector<linked_ptr<PendingRequest>> pending_requests_; 487 std::vector<linked_ptr<PendingRequest>> pending_requests_;
488 size_t wait_for_request_count_;
489 scoped_ptr<WebSocketClientImpl> web_socket_;
490
323 // Pointing to a stack-allocated RunLoop instance. 491 // Pointing to a stack-allocated RunLoop instance.
324 base::RunLoop* run_loop_; 492 base::RunLoop* run_loop_;
325 size_t wait_for_request_count_;
326 493
327 DISALLOW_COPY_AND_ASSIGN(HttpConnectionDelegateImpl); 494 DISALLOW_COPY_AND_ASSIGN(HttpConnectionDelegateImpl);
328 }; 495 };
329 496
330 class HttpServerDelegateImpl : public HttpServerDelegate { 497 class HttpServerDelegateImpl : public HttpServerDelegate {
331 public: 498 public:
332 explicit HttpServerDelegateImpl(HttpServerDelegatePtr* delegate_ptr) 499 explicit HttpServerDelegateImpl(HttpServerDelegatePtr* delegate_ptr)
333 : binding_(this, delegate_ptr), 500 : binding_(this, delegate_ptr),
334 run_loop_(nullptr), 501 wait_for_connection_count_(0),
335 wait_for_connection_count_(0) {} 502 run_loop_(nullptr) {}
336 ~HttpServerDelegateImpl() override {} 503 ~HttpServerDelegateImpl() override {}
337 504
338 // HttpServerDelegate implementation. 505 // HttpServerDelegate implementation.
339 void OnConnected(HttpConnectionPtr connection, 506 void OnConnected(HttpConnectionPtr connection,
340 InterfaceRequest<HttpConnectionDelegate> delegate) override { 507 InterfaceRequest<HttpConnectionDelegate> delegate) override {
341 connections_.push_back(make_linked_ptr( 508 connections_.push_back(make_linked_ptr(
342 new HttpConnectionDelegateImpl(connection.Pass(), delegate.Pass()))); 509 new HttpConnectionDelegateImpl(connection.Pass(), delegate.Pass())));
343 if (run_loop_ && connections_.size() >= wait_for_connection_count_) { 510 if (run_loop_ && connections_.size() >= wait_for_connection_count_) {
344 wait_for_connection_count_ = 0; 511 wait_for_connection_count_ = 0;
345 run_loop_->Quit(); 512 run_loop_->Quit();
346 } 513 }
347 } 514 }
348 515
349 void WaitForConnection(size_t count) { 516 void WaitForConnection(size_t count) {
350 DCHECK(!run_loop_); 517 DCHECK(!run_loop_);
351 518
519 if (connections_.size() >= count)
520 return;
521
352 wait_for_connection_count_ = count; 522 wait_for_connection_count_ = count;
353 base::RunLoop run_loop; 523 base::RunLoop run_loop;
354 run_loop_ = &run_loop; 524 run_loop_ = &run_loop;
355 run_loop.Run(); 525 run_loop.Run();
356 run_loop_ = nullptr; 526 run_loop_ = nullptr;
357 } 527 }
358 528
359 std::vector<linked_ptr<HttpConnectionDelegateImpl>>& connections() { 529 std::vector<linked_ptr<HttpConnectionDelegateImpl>>& connections() {
360 return connections_; 530 return connections_;
361 } 531 }
362 532
363 private: 533 private:
364 Binding<HttpServerDelegate> binding_; 534 Binding<HttpServerDelegate> binding_;
365 std::vector<linked_ptr<HttpConnectionDelegateImpl>> connections_; 535 std::vector<linked_ptr<HttpConnectionDelegateImpl>> connections_;
536 size_t wait_for_connection_count_;
366 // Pointing to a stack-allocated RunLoop instance. 537 // Pointing to a stack-allocated RunLoop instance.
367 base::RunLoop* run_loop_; 538 base::RunLoop* run_loop_;
368 size_t wait_for_connection_count_;
369 539
370 DISALLOW_COPY_AND_ASSIGN(HttpServerDelegateImpl); 540 DISALLOW_COPY_AND_ASSIGN(HttpServerDelegateImpl);
371 }; 541 };
372 542
373 class HttpServerAppTest : public test::ApplicationTestBase { 543 class HttpServerAppTest : public test::ApplicationTestBase {
374 public: 544 public:
375 HttpServerAppTest() : message_loop_(base::MessageLoop::TYPE_IO) {} 545 HttpServerAppTest() : message_loop_(base::MessageLoop::TYPE_IO) {}
376 ~HttpServerAppTest() override {} 546 ~HttpServerAppTest() override {}
377 547
378 protected: 548 protected:
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
471 {{"Content-Length", "26"}}, 641 {{"Content-Length", "26"}},
472 make_scoped_ptr(new std::string("This is a test response..."))}; 642 make_scoped_ptr(new std::string("This is a test response..."))};
473 connection.SendResponse(MakeResponseStruct(response_data)); 643 connection.SendResponse(MakeResponseStruct(response_data));
474 644
475 std::string response_message; 645 std::string response_message;
476 client.ReadResponse(&response_message); 646 client.ReadResponse(&response_message);
477 647
478 CheckResponse(response_data, response_message); 648 CheckResponse(response_data, response_message);
479 } 649 }
480 650
651 TEST_F(HttpServerAppTest, WebSocket) {
652 NetAddressPtr bound_to;
653 HttpServerDelegatePtr server_delegate_ptr;
654 HttpServerDelegateImpl server_delegate_impl(&server_delegate_ptr);
655 CreateHttpServer(server_delegate_ptr.Pass(), &bound_to);
656
657 WebSocketPtr web_socket_ptr;
658 network_service_->CreateWebSocket(GetProxy(&web_socket_ptr));
659 WebSocketClientImpl socket_0;
660 socket_0.Connect(
661 web_socket_ptr.Pass(),
662 base::StringPrintf("ws://127.0.0.1:%d/hello", bound_to->ipv4->port));
663
664 server_delegate_impl.WaitForConnection(1);
665 HttpConnectionDelegateImpl& connection =
666 *server_delegate_impl.connections()[0];
667
668 connection.WaitForWebSocketRequest();
669 WebSocketClientImpl& socket_1 = *connection.web_socket();
670
671 socket_1.WaitForConnectCompletion();
672 socket_0.WaitForConnectCompletion();
673
674 socket_0.Send("Hello");
675 socket_0.Send("world!");
676
677 socket_1.WaitForMessage(2);
678 EXPECT_EQ("Hello", socket_1.received_messages()[0]);
679 EXPECT_EQ("world!", socket_1.received_messages()[1]);
680
681 socket_1.Send("How do");
682 socket_1.Send("you do?");
683
684 socket_0.WaitForMessage(2);
685 EXPECT_EQ("How do", socket_0.received_messages()[0]);
686 EXPECT_EQ("you do?", socket_0.received_messages()[1]);
687 }
688
481 } // namespace mojo 689 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/services/network/http_connection_impl.cc ('k') | mojo/services/network/public/interfaces/http_connection.mojom » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698