Index: mojo/services/network/http_server_apptest.cc |
diff --git a/mojo/services/network/http_server_apptest.cc b/mojo/services/network/http_server_apptest.cc |
index a17fcf137480e277984e3c7c209e5716c0482df0..f46d1ebac976e2d9781ce40abe65abe9b980a590 100644 |
--- a/mojo/services/network/http_server_apptest.cc |
+++ b/mojo/services/network/http_server_apptest.cc |
@@ -2,19 +2,27 @@ |
// Use of this source code is governed by a BSD-style license that can be |
// found in the LICENSE file. |
+#include "base/logging.h" |
#include "base/macros.h" |
#include "base/memory/linked_ptr.h" |
#include "base/memory/ref_counted.h" |
#include "base/memory/scoped_ptr.h" |
#include "base/run_loop.h" |
#include "base/strings/string_util.h" |
+#include "base/strings/stringprintf.h" |
#include "mojo/application/public/cpp/application_connection.h" |
#include "mojo/application/public/cpp/application_impl.h" |
#include "mojo/application/public/cpp/application_test_base.h" |
#include "mojo/common/data_pipe_utils.h" |
#include "mojo/services/network/net_address_type_converters.h" |
+#include "mojo/services/network/public/cpp/web_socket_read_queue.h" |
+#include "mojo/services/network/public/cpp/web_socket_write_queue.h" |
+#include "mojo/services/network/public/interfaces/http_connection.mojom.h" |
+#include "mojo/services/network/public/interfaces/http_message.mojom.h" |
#include "mojo/services/network/public/interfaces/http_server.mojom.h" |
+#include "mojo/services/network/public/interfaces/net_address.mojom.h" |
#include "mojo/services/network/public/interfaces/network_service.mojom.h" |
+#include "mojo/services/network/public/interfaces/web_socket.mojom.h" |
#include "net/base/io_buffer.h" |
#include "net/base/net_errors.h" |
#include "net/base/test_completion_callback.h" |
@@ -261,6 +269,141 @@ class TestHttpClient { |
DISALLOW_COPY_AND_ASSIGN(TestHttpClient); |
}; |
+class WebSocketClientImpl : public WebSocketClient { |
+ public: |
+ explicit WebSocketClientImpl() |
+ : binding_(this, &client_ptr_), |
+ wait_for_message_count_(0), |
+ run_loop_(nullptr) {} |
+ ~WebSocketClientImpl() override {} |
+ |
+ // Establishes a connection from the client side. |
+ void Connect(WebSocketPtr web_socket, const std::string& url) { |
+ web_socket_ = web_socket.Pass(); |
+ |
+ DataPipe data_pipe; |
+ send_stream_ = data_pipe.producer_handle.Pass(); |
+ write_send_stream_.reset(new WebSocketWriteQueue(send_stream_.get())); |
+ |
+ web_socket_->Connect(url, Array<String>(0), "http://example.com", |
+ data_pipe.consumer_handle.Pass(), client_ptr_.Pass()); |
+ } |
+ |
+ // Establishes a connection from the server side. |
+ void AcceptConnectRequest( |
+ const HttpConnectionDelegate::OnReceivedWebSocketRequestCallback& |
+ callback) { |
+ InterfaceRequest<WebSocket> web_socket_request = GetProxy(&web_socket_); |
+ |
+ DataPipe data_pipe; |
+ send_stream_ = data_pipe.producer_handle.Pass(); |
+ write_send_stream_.reset(new WebSocketWriteQueue(send_stream_.get())); |
+ |
+ callback.Run(web_socket_request.Pass(), data_pipe.consumer_handle.Pass(), |
+ client_ptr_.Pass()); |
+ } |
+ |
+ void WaitForConnectCompletion() { |
+ DCHECK(!run_loop_); |
+ |
+ if (receive_stream_.is_valid()) |
+ return; |
+ |
+ base::RunLoop run_loop; |
+ run_loop_ = &run_loop; |
+ run_loop.Run(); |
+ run_loop_ = nullptr; |
+ } |
+ |
+ void Send(const std::string& message) { |
+ DCHECK(!message.empty()); |
+ |
+ uint32_t size = static_cast<uint32_t>(message.size()); |
+ write_send_stream_->Write( |
+ &message[0], size, |
+ base::Bind(&WebSocketClientImpl::OnFinishedWritingSendStream, |
+ base::Unretained(this), size)); |
+ } |
+ |
+ void WaitForMessage(size_t count) { |
+ DCHECK(!run_loop_); |
+ |
+ if (received_messages_.size() >= count) |
+ return; |
+ wait_for_message_count_ = count; |
+ base::RunLoop run_loop; |
+ run_loop_ = &run_loop; |
+ run_loop.Run(); |
+ run_loop_ = nullptr; |
+ } |
+ |
+ std::vector<std::string>& received_messages() { return received_messages_; } |
+ |
+ private: |
+ // WebSocketClient implementation. |
+ void DidConnect(const String& selected_subprotocol, |
+ const String& extensions, |
+ ScopedDataPipeConsumerHandle receive_stream) override { |
+ receive_stream_ = receive_stream.Pass(); |
+ read_receive_stream_.reset(new WebSocketReadQueue(receive_stream_.get())); |
+ |
+ web_socket_->FlowControl(2048); |
+ if (run_loop_) |
+ run_loop_->Quit(); |
+ } |
+ |
+ void DidReceiveData(bool fin, |
+ WebSocket::MessageType type, |
+ uint32_t num_bytes) override { |
+ DCHECK(num_bytes > 0); |
+ |
+ read_receive_stream_->Read( |
+ num_bytes, |
+ base::Bind(&WebSocketClientImpl::OnFinishedReadingReceiveStream, |
+ base::Unretained(this), num_bytes)); |
+ } |
+ |
+ void DidReceiveFlowControl(int64_t quota) override {} |
+ |
+ void DidFail(const String& message) override {} |
+ |
+ void DidClose(bool was_clean, uint16_t code, const String& reason) override {} |
+ |
+ void OnFinishedWritingSendStream(uint32_t num_bytes, const char* buffer) { |
+ EXPECT_TRUE(buffer); |
+ |
+ web_socket_->Send(true, WebSocket::MESSAGE_TYPE_TEXT, num_bytes); |
+ } |
+ |
+ void OnFinishedReadingReceiveStream(uint32_t num_bytes, const char* data) { |
+ EXPECT_TRUE(data); |
+ |
+ received_messages_.push_back(std::string(data, num_bytes)); |
+ if (run_loop_ && received_messages_.size() >= wait_for_message_count_) { |
+ wait_for_message_count_ = 0; |
+ run_loop_->Quit(); |
+ } |
+ } |
+ |
+ WebSocketClientPtr client_ptr_; |
+ Binding<WebSocketClient> binding_; |
+ WebSocketPtr web_socket_; |
+ |
+ ScopedDataPipeProducerHandle send_stream_; |
+ scoped_ptr<WebSocketWriteQueue> write_send_stream_; |
+ |
+ ScopedDataPipeConsumerHandle receive_stream_; |
+ scoped_ptr<WebSocketReadQueue> read_receive_stream_; |
+ |
+ std::vector<std::string> received_messages_; |
+ size_t wait_for_message_count_; |
+ |
+ // Pointing to a stack-allocated RunLoop instance. |
+ base::RunLoop* run_loop_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(WebSocketClientImpl); |
+}; |
+ |
class HttpConnectionDelegateImpl : public HttpConnectionDelegate { |
public: |
struct PendingRequest { |
@@ -272,8 +415,8 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate { |
InterfaceRequest<HttpConnectionDelegate> request) |
: connection_(connection.Pass()), |
binding_(this, request.Pass()), |
- run_loop_(nullptr), |
- wait_for_request_count_(0) {} |
+ wait_for_request_count_(0), |
+ run_loop_(nullptr) {} |
~HttpConnectionDelegateImpl() override {} |
// HttpConnectionDelegate implementation: |
@@ -292,7 +435,12 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate { |
void OnReceivedWebSocketRequest( |
HttpRequestPtr request, |
const OnReceivedWebSocketRequestCallback& callback) override { |
- NOTREACHED(); |
+ web_socket_.reset(new WebSocketClientImpl()); |
+ |
+ web_socket_->AcceptConnectRequest(callback); |
+ |
+ if (run_loop_) |
+ run_loop_->Quit(); |
} |
void SendResponse(HttpResponsePtr response) { |
@@ -305,6 +453,9 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate { |
void WaitForRequest(size_t count) { |
DCHECK(!run_loop_); |
+ if (pending_requests_.size() >= count) |
+ return; |
+ |
wait_for_request_count_ = count; |
base::RunLoop run_loop; |
run_loop_ = &run_loop; |
@@ -312,17 +463,33 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate { |
run_loop_ = nullptr; |
} |
+ void WaitForWebSocketRequest() { |
+ DCHECK(!run_loop_); |
+ |
+ if (web_socket_) |
+ return; |
+ |
+ base::RunLoop run_loop; |
+ run_loop_ = &run_loop; |
+ run_loop.Run(); |
+ run_loop_ = nullptr; |
+ } |
+ |
std::vector<linked_ptr<PendingRequest>>& pending_requests() { |
return pending_requests_; |
} |
+ WebSocketClientImpl* web_socket() { return web_socket_.get(); } |
+ |
private: |
HttpConnectionPtr connection_; |
Binding<HttpConnectionDelegate> binding_; |
std::vector<linked_ptr<PendingRequest>> pending_requests_; |
+ size_t wait_for_request_count_; |
+ scoped_ptr<WebSocketClientImpl> web_socket_; |
+ |
// Pointing to a stack-allocated RunLoop instance. |
base::RunLoop* run_loop_; |
- size_t wait_for_request_count_; |
DISALLOW_COPY_AND_ASSIGN(HttpConnectionDelegateImpl); |
}; |
@@ -331,8 +498,8 @@ class HttpServerDelegateImpl : public HttpServerDelegate { |
public: |
explicit HttpServerDelegateImpl(HttpServerDelegatePtr* delegate_ptr) |
: binding_(this, delegate_ptr), |
- run_loop_(nullptr), |
- wait_for_connection_count_(0) {} |
+ wait_for_connection_count_(0), |
+ run_loop_(nullptr) {} |
~HttpServerDelegateImpl() override {} |
// HttpServerDelegate implementation. |
@@ -349,6 +516,9 @@ class HttpServerDelegateImpl : public HttpServerDelegate { |
void WaitForConnection(size_t count) { |
DCHECK(!run_loop_); |
+ if (connections_.size() >= count) |
+ return; |
+ |
wait_for_connection_count_ = count; |
base::RunLoop run_loop; |
run_loop_ = &run_loop; |
@@ -363,9 +533,9 @@ class HttpServerDelegateImpl : public HttpServerDelegate { |
private: |
Binding<HttpServerDelegate> binding_; |
std::vector<linked_ptr<HttpConnectionDelegateImpl>> connections_; |
+ size_t wait_for_connection_count_; |
// Pointing to a stack-allocated RunLoop instance. |
base::RunLoop* run_loop_; |
- size_t wait_for_connection_count_; |
DISALLOW_COPY_AND_ASSIGN(HttpServerDelegateImpl); |
}; |
@@ -478,4 +648,42 @@ TEST_F(HttpServerAppTest, HttpRequestResponseWithBody) { |
CheckResponse(response_data, response_message); |
} |
+TEST_F(HttpServerAppTest, WebSocket) { |
+ NetAddressPtr bound_to; |
+ HttpServerDelegatePtr server_delegate_ptr; |
+ HttpServerDelegateImpl server_delegate_impl(&server_delegate_ptr); |
+ CreateHttpServer(server_delegate_ptr.Pass(), &bound_to); |
+ |
+ WebSocketPtr web_socket_ptr; |
+ network_service_->CreateWebSocket(GetProxy(&web_socket_ptr)); |
+ WebSocketClientImpl socket_0; |
+ socket_0.Connect( |
+ web_socket_ptr.Pass(), |
+ base::StringPrintf("ws://127.0.0.1:%d/hello", bound_to->ipv4->port)); |
+ |
+ server_delegate_impl.WaitForConnection(1); |
+ HttpConnectionDelegateImpl& connection = |
+ *server_delegate_impl.connections()[0]; |
+ |
+ connection.WaitForWebSocketRequest(); |
+ WebSocketClientImpl& socket_1 = *connection.web_socket(); |
+ |
+ socket_1.WaitForConnectCompletion(); |
+ socket_0.WaitForConnectCompletion(); |
+ |
+ socket_0.Send("Hello"); |
+ socket_0.Send("world!"); |
+ |
+ socket_1.WaitForMessage(2); |
+ EXPECT_EQ("Hello", socket_1.received_messages()[0]); |
+ EXPECT_EQ("world!", socket_1.received_messages()[1]); |
+ |
+ socket_1.Send("How do"); |
+ socket_1.Send("you do?"); |
+ |
+ socket_0.WaitForMessage(2); |
+ EXPECT_EQ("How do", socket_0.received_messages()[0]); |
+ EXPECT_EQ("you do?", socket_0.received_messages()[1]); |
+} |
+ |
} // namespace mojo |