| Index: mojo/services/network/http_server_apptest.cc
|
| diff --git a/mojo/services/network/http_server_apptest.cc b/mojo/services/network/http_server_apptest.cc
|
| index a17fcf137480e277984e3c7c209e5716c0482df0..f46d1ebac976e2d9781ce40abe65abe9b980a590 100644
|
| --- a/mojo/services/network/http_server_apptest.cc
|
| +++ b/mojo/services/network/http_server_apptest.cc
|
| @@ -2,19 +2,27 @@
|
| // Use of this source code is governed by a BSD-style license that can be
|
| // found in the LICENSE file.
|
|
|
| +#include "base/logging.h"
|
| #include "base/macros.h"
|
| #include "base/memory/linked_ptr.h"
|
| #include "base/memory/ref_counted.h"
|
| #include "base/memory/scoped_ptr.h"
|
| #include "base/run_loop.h"
|
| #include "base/strings/string_util.h"
|
| +#include "base/strings/stringprintf.h"
|
| #include "mojo/application/public/cpp/application_connection.h"
|
| #include "mojo/application/public/cpp/application_impl.h"
|
| #include "mojo/application/public/cpp/application_test_base.h"
|
| #include "mojo/common/data_pipe_utils.h"
|
| #include "mojo/services/network/net_address_type_converters.h"
|
| +#include "mojo/services/network/public/cpp/web_socket_read_queue.h"
|
| +#include "mojo/services/network/public/cpp/web_socket_write_queue.h"
|
| +#include "mojo/services/network/public/interfaces/http_connection.mojom.h"
|
| +#include "mojo/services/network/public/interfaces/http_message.mojom.h"
|
| #include "mojo/services/network/public/interfaces/http_server.mojom.h"
|
| +#include "mojo/services/network/public/interfaces/net_address.mojom.h"
|
| #include "mojo/services/network/public/interfaces/network_service.mojom.h"
|
| +#include "mojo/services/network/public/interfaces/web_socket.mojom.h"
|
| #include "net/base/io_buffer.h"
|
| #include "net/base/net_errors.h"
|
| #include "net/base/test_completion_callback.h"
|
| @@ -261,6 +269,141 @@ class TestHttpClient {
|
| DISALLOW_COPY_AND_ASSIGN(TestHttpClient);
|
| };
|
|
|
| +class WebSocketClientImpl : public WebSocketClient {
|
| + public:
|
| + explicit WebSocketClientImpl()
|
| + : binding_(this, &client_ptr_),
|
| + wait_for_message_count_(0),
|
| + run_loop_(nullptr) {}
|
| + ~WebSocketClientImpl() override {}
|
| +
|
| + // Establishes a connection from the client side.
|
| + void Connect(WebSocketPtr web_socket, const std::string& url) {
|
| + web_socket_ = web_socket.Pass();
|
| +
|
| + DataPipe data_pipe;
|
| + send_stream_ = data_pipe.producer_handle.Pass();
|
| + write_send_stream_.reset(new WebSocketWriteQueue(send_stream_.get()));
|
| +
|
| + web_socket_->Connect(url, Array<String>(0), "http://example.com",
|
| + data_pipe.consumer_handle.Pass(), client_ptr_.Pass());
|
| + }
|
| +
|
| + // Establishes a connection from the server side.
|
| + void AcceptConnectRequest(
|
| + const HttpConnectionDelegate::OnReceivedWebSocketRequestCallback&
|
| + callback) {
|
| + InterfaceRequest<WebSocket> web_socket_request = GetProxy(&web_socket_);
|
| +
|
| + DataPipe data_pipe;
|
| + send_stream_ = data_pipe.producer_handle.Pass();
|
| + write_send_stream_.reset(new WebSocketWriteQueue(send_stream_.get()));
|
| +
|
| + callback.Run(web_socket_request.Pass(), data_pipe.consumer_handle.Pass(),
|
| + client_ptr_.Pass());
|
| + }
|
| +
|
| + void WaitForConnectCompletion() {
|
| + DCHECK(!run_loop_);
|
| +
|
| + if (receive_stream_.is_valid())
|
| + return;
|
| +
|
| + base::RunLoop run_loop;
|
| + run_loop_ = &run_loop;
|
| + run_loop.Run();
|
| + run_loop_ = nullptr;
|
| + }
|
| +
|
| + void Send(const std::string& message) {
|
| + DCHECK(!message.empty());
|
| +
|
| + uint32_t size = static_cast<uint32_t>(message.size());
|
| + write_send_stream_->Write(
|
| + &message[0], size,
|
| + base::Bind(&WebSocketClientImpl::OnFinishedWritingSendStream,
|
| + base::Unretained(this), size));
|
| + }
|
| +
|
| + void WaitForMessage(size_t count) {
|
| + DCHECK(!run_loop_);
|
| +
|
| + if (received_messages_.size() >= count)
|
| + return;
|
| + wait_for_message_count_ = count;
|
| + base::RunLoop run_loop;
|
| + run_loop_ = &run_loop;
|
| + run_loop.Run();
|
| + run_loop_ = nullptr;
|
| + }
|
| +
|
| + std::vector<std::string>& received_messages() { return received_messages_; }
|
| +
|
| + private:
|
| + // WebSocketClient implementation.
|
| + void DidConnect(const String& selected_subprotocol,
|
| + const String& extensions,
|
| + ScopedDataPipeConsumerHandle receive_stream) override {
|
| + receive_stream_ = receive_stream.Pass();
|
| + read_receive_stream_.reset(new WebSocketReadQueue(receive_stream_.get()));
|
| +
|
| + web_socket_->FlowControl(2048);
|
| + if (run_loop_)
|
| + run_loop_->Quit();
|
| + }
|
| +
|
| + void DidReceiveData(bool fin,
|
| + WebSocket::MessageType type,
|
| + uint32_t num_bytes) override {
|
| + DCHECK(num_bytes > 0);
|
| +
|
| + read_receive_stream_->Read(
|
| + num_bytes,
|
| + base::Bind(&WebSocketClientImpl::OnFinishedReadingReceiveStream,
|
| + base::Unretained(this), num_bytes));
|
| + }
|
| +
|
| + void DidReceiveFlowControl(int64_t quota) override {}
|
| +
|
| + void DidFail(const String& message) override {}
|
| +
|
| + void DidClose(bool was_clean, uint16_t code, const String& reason) override {}
|
| +
|
| + void OnFinishedWritingSendStream(uint32_t num_bytes, const char* buffer) {
|
| + EXPECT_TRUE(buffer);
|
| +
|
| + web_socket_->Send(true, WebSocket::MESSAGE_TYPE_TEXT, num_bytes);
|
| + }
|
| +
|
| + void OnFinishedReadingReceiveStream(uint32_t num_bytes, const char* data) {
|
| + EXPECT_TRUE(data);
|
| +
|
| + received_messages_.push_back(std::string(data, num_bytes));
|
| + if (run_loop_ && received_messages_.size() >= wait_for_message_count_) {
|
| + wait_for_message_count_ = 0;
|
| + run_loop_->Quit();
|
| + }
|
| + }
|
| +
|
| + WebSocketClientPtr client_ptr_;
|
| + Binding<WebSocketClient> binding_;
|
| + WebSocketPtr web_socket_;
|
| +
|
| + ScopedDataPipeProducerHandle send_stream_;
|
| + scoped_ptr<WebSocketWriteQueue> write_send_stream_;
|
| +
|
| + ScopedDataPipeConsumerHandle receive_stream_;
|
| + scoped_ptr<WebSocketReadQueue> read_receive_stream_;
|
| +
|
| + std::vector<std::string> received_messages_;
|
| + size_t wait_for_message_count_;
|
| +
|
| + // Pointing to a stack-allocated RunLoop instance.
|
| + base::RunLoop* run_loop_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(WebSocketClientImpl);
|
| +};
|
| +
|
| class HttpConnectionDelegateImpl : public HttpConnectionDelegate {
|
| public:
|
| struct PendingRequest {
|
| @@ -272,8 +415,8 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate {
|
| InterfaceRequest<HttpConnectionDelegate> request)
|
| : connection_(connection.Pass()),
|
| binding_(this, request.Pass()),
|
| - run_loop_(nullptr),
|
| - wait_for_request_count_(0) {}
|
| + wait_for_request_count_(0),
|
| + run_loop_(nullptr) {}
|
| ~HttpConnectionDelegateImpl() override {}
|
|
|
| // HttpConnectionDelegate implementation:
|
| @@ -292,7 +435,12 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate {
|
| void OnReceivedWebSocketRequest(
|
| HttpRequestPtr request,
|
| const OnReceivedWebSocketRequestCallback& callback) override {
|
| - NOTREACHED();
|
| + web_socket_.reset(new WebSocketClientImpl());
|
| +
|
| + web_socket_->AcceptConnectRequest(callback);
|
| +
|
| + if (run_loop_)
|
| + run_loop_->Quit();
|
| }
|
|
|
| void SendResponse(HttpResponsePtr response) {
|
| @@ -305,6 +453,9 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate {
|
| void WaitForRequest(size_t count) {
|
| DCHECK(!run_loop_);
|
|
|
| + if (pending_requests_.size() >= count)
|
| + return;
|
| +
|
| wait_for_request_count_ = count;
|
| base::RunLoop run_loop;
|
| run_loop_ = &run_loop;
|
| @@ -312,17 +463,33 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate {
|
| run_loop_ = nullptr;
|
| }
|
|
|
| + void WaitForWebSocketRequest() {
|
| + DCHECK(!run_loop_);
|
| +
|
| + if (web_socket_)
|
| + return;
|
| +
|
| + base::RunLoop run_loop;
|
| + run_loop_ = &run_loop;
|
| + run_loop.Run();
|
| + run_loop_ = nullptr;
|
| + }
|
| +
|
| std::vector<linked_ptr<PendingRequest>>& pending_requests() {
|
| return pending_requests_;
|
| }
|
|
|
| + WebSocketClientImpl* web_socket() { return web_socket_.get(); }
|
| +
|
| private:
|
| HttpConnectionPtr connection_;
|
| Binding<HttpConnectionDelegate> binding_;
|
| std::vector<linked_ptr<PendingRequest>> pending_requests_;
|
| + size_t wait_for_request_count_;
|
| + scoped_ptr<WebSocketClientImpl> web_socket_;
|
| +
|
| // Pointing to a stack-allocated RunLoop instance.
|
| base::RunLoop* run_loop_;
|
| - size_t wait_for_request_count_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(HttpConnectionDelegateImpl);
|
| };
|
| @@ -331,8 +498,8 @@ class HttpServerDelegateImpl : public HttpServerDelegate {
|
| public:
|
| explicit HttpServerDelegateImpl(HttpServerDelegatePtr* delegate_ptr)
|
| : binding_(this, delegate_ptr),
|
| - run_loop_(nullptr),
|
| - wait_for_connection_count_(0) {}
|
| + wait_for_connection_count_(0),
|
| + run_loop_(nullptr) {}
|
| ~HttpServerDelegateImpl() override {}
|
|
|
| // HttpServerDelegate implementation.
|
| @@ -349,6 +516,9 @@ class HttpServerDelegateImpl : public HttpServerDelegate {
|
| void WaitForConnection(size_t count) {
|
| DCHECK(!run_loop_);
|
|
|
| + if (connections_.size() >= count)
|
| + return;
|
| +
|
| wait_for_connection_count_ = count;
|
| base::RunLoop run_loop;
|
| run_loop_ = &run_loop;
|
| @@ -363,9 +533,9 @@ class HttpServerDelegateImpl : public HttpServerDelegate {
|
| private:
|
| Binding<HttpServerDelegate> binding_;
|
| std::vector<linked_ptr<HttpConnectionDelegateImpl>> connections_;
|
| + size_t wait_for_connection_count_;
|
| // Pointing to a stack-allocated RunLoop instance.
|
| base::RunLoop* run_loop_;
|
| - size_t wait_for_connection_count_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(HttpServerDelegateImpl);
|
| };
|
| @@ -478,4 +648,42 @@ TEST_F(HttpServerAppTest, HttpRequestResponseWithBody) {
|
| CheckResponse(response_data, response_message);
|
| }
|
|
|
| +TEST_F(HttpServerAppTest, WebSocket) {
|
| + NetAddressPtr bound_to;
|
| + HttpServerDelegatePtr server_delegate_ptr;
|
| + HttpServerDelegateImpl server_delegate_impl(&server_delegate_ptr);
|
| + CreateHttpServer(server_delegate_ptr.Pass(), &bound_to);
|
| +
|
| + WebSocketPtr web_socket_ptr;
|
| + network_service_->CreateWebSocket(GetProxy(&web_socket_ptr));
|
| + WebSocketClientImpl socket_0;
|
| + socket_0.Connect(
|
| + web_socket_ptr.Pass(),
|
| + base::StringPrintf("ws://127.0.0.1:%d/hello", bound_to->ipv4->port));
|
| +
|
| + server_delegate_impl.WaitForConnection(1);
|
| + HttpConnectionDelegateImpl& connection =
|
| + *server_delegate_impl.connections()[0];
|
| +
|
| + connection.WaitForWebSocketRequest();
|
| + WebSocketClientImpl& socket_1 = *connection.web_socket();
|
| +
|
| + socket_1.WaitForConnectCompletion();
|
| + socket_0.WaitForConnectCompletion();
|
| +
|
| + socket_0.Send("Hello");
|
| + socket_0.Send("world!");
|
| +
|
| + socket_1.WaitForMessage(2);
|
| + EXPECT_EQ("Hello", socket_1.received_messages()[0]);
|
| + EXPECT_EQ("world!", socket_1.received_messages()[1]);
|
| +
|
| + socket_1.Send("How do");
|
| + socket_1.Send("you do?");
|
| +
|
| + socket_0.WaitForMessage(2);
|
| + EXPECT_EQ("How do", socket_0.received_messages()[0]);
|
| + EXPECT_EQ("you do?", socket_0.received_messages()[1]);
|
| +}
|
| +
|
| } // namespace mojo
|
|
|